Merge commit 'origin/master-tx'
[profile/ivi/pulseaudio-panda.git] / src / pulsecore / protocol-esound.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <errno.h>
28 #include <string.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <limits.h>
32
33 #include <pulse/sample.h>
34 #include <pulse/timeval.h>
35 #include <pulse/utf8.h>
36 #include <pulse/xmalloc.h>
37 #include <pulse/proplist.h>
38
39 #include <pulsecore/esound.h>
40 #include <pulsecore/memblock.h>
41 #include <pulsecore/client.h>
42 #include <pulsecore/sink-input.h>
43 #include <pulsecore/sink.h>
44 #include <pulsecore/source-output.h>
45 #include <pulsecore/source.h>
46 #include <pulsecore/core-scache.h>
47 #include <pulsecore/sample-util.h>
48 #include <pulsecore/authkey.h>
49 #include <pulsecore/namereg.h>
50 #include <pulsecore/log.h>
51 #include <pulsecore/core-util.h>
52 #include <pulsecore/core-error.h>
53 #include <pulsecore/ipacl.h>
54 #include <pulsecore/macro.h>
55 #include <pulsecore/thread-mq.h>
56 #include <pulsecore/shared.h>
57
58 #include "endianmacros.h"
59
60 #include "protocol-esound.h"
61
62 /* Don't accept more connection than this */
63 #define MAX_CONNECTIONS 64
64
65 /* Kick a client if it doesn't authenticate within this time */
66 #define AUTH_TIMEOUT 5
67
68 #define DEFAULT_COOKIE_FILE ".esd_auth"
69
70 #define PLAYBACK_BUFFER_SECONDS (.25)
71 #define PLAYBACK_BUFFER_FRAGMENTS (10)
72 #define RECORD_BUFFER_SECONDS (5)
73
74 #define MAX_CACHE_SAMPLE_SIZE (2048000)
75
76 #define DEFAULT_SINK_LATENCY (150*PA_USEC_PER_MSEC)
77 #define DEFAULT_SOURCE_LATENCY (150*PA_USEC_PER_MSEC)
78
79 #define SCACHE_PREFIX "esound."
80
81 /* This is heavily based on esound's code */
82
83 typedef struct connection {
84     pa_msgobject parent;
85
86     uint32_t index;
87     pa_bool_t dead;
88     pa_esound_protocol *protocol;
89     pa_esound_options *options;
90     pa_iochannel *io;
91     pa_client *client;
92     pa_bool_t authorized, swap_byte_order;
93     void *write_data;
94     size_t write_data_alloc, write_data_index, write_data_length;
95     void *read_data;
96     size_t read_data_alloc, read_data_length;
97     esd_proto_t request;
98     esd_client_state_t state;
99     pa_sink_input *sink_input;
100     pa_source_output *source_output;
101     pa_memblockq *input_memblockq, *output_memblockq;
102     pa_defer_event *defer_event;
103
104     char *original_name;
105
106     struct {
107         pa_memblock *current_memblock;
108         size_t memblock_index;
109         pa_atomic_t missing;
110         pa_bool_t underrun;
111     } playback;
112
113     struct {
114         pa_memchunk memchunk;
115         char *name;
116         pa_sample_spec sample_spec;
117     } scache;
118
119     pa_time_event *auth_timeout_event;
120 } connection;
121
122 PA_DECLARE_CLASS(connection);
123 #define CONNECTION(o) (connection_cast(o))
124 static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);
125
126 struct pa_esound_protocol {
127     PA_REFCNT_DECLARE;
128
129     pa_core *core;
130     pa_idxset *connections;
131     unsigned n_player;
132 };
133
134 enum {
135     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
136     SINK_INPUT_MESSAGE_DISABLE_PREBUF
137 };
138
139 enum {
140     CONNECTION_MESSAGE_REQUEST_DATA,
141     CONNECTION_MESSAGE_POST_DATA,
142     CONNECTION_MESSAGE_UNLINK_CONNECTION
143 };
144
145 typedef struct proto_handler {
146     size_t data_length;
147     int (*proc)(connection *c, esd_proto_t request, const void *data, size_t length);
148     const char *description;
149 } esd_proto_handler_info_t;
150
151 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
152 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
153 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
154 static void sink_input_kill_cb(pa_sink_input *i);
155 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
156 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
157
158 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
159 static void source_output_kill_cb(pa_source_output *o);
160
161 static int esd_proto_connect(connection *c, esd_proto_t request, const void *data, size_t length);
162 static int esd_proto_stream_play(connection *c, esd_proto_t request, const void *data, size_t length);
163 static int esd_proto_stream_record(connection *c, esd_proto_t request, const void *data, size_t length);
164 static int esd_proto_get_latency(connection *c, esd_proto_t request, const void *data, size_t length);
165 static int esd_proto_server_info(connection *c, esd_proto_t request, const void *data, size_t length);
166 static int esd_proto_all_info(connection *c, esd_proto_t request, const void *data, size_t length);
167 static int esd_proto_stream_pan(connection *c, esd_proto_t request, const void *data, size_t length);
168 static int esd_proto_sample_pan(connection *c, esd_proto_t request, const void *data, size_t length);
169 static int esd_proto_sample_cache(connection *c, esd_proto_t request, const void *data, size_t length);
170 static int esd_proto_sample_free_or_play(connection *c, esd_proto_t request, const void *data, size_t length);
171 static int esd_proto_sample_get_id(connection *c, esd_proto_t request, const void *data, size_t length);
172 static int esd_proto_standby_or_resume(connection *c, esd_proto_t request, const void *data, size_t length);
173 static int esd_proto_standby_mode(connection *c, esd_proto_t request, const void *data, size_t length);
174
175 /* the big map of protocol handler info */
176 static struct proto_handler proto_map[ESD_PROTO_MAX] = {
177     { ESD_KEY_LEN + sizeof(int),      esd_proto_connect, "connect" },
178     { ESD_KEY_LEN + sizeof(int),      NULL, "lock" },
179     { ESD_KEY_LEN + sizeof(int),      NULL, "unlock" },
180
181     { ESD_NAME_MAX + 2 * sizeof(int), esd_proto_stream_play, "stream play" },
182     { ESD_NAME_MAX + 2 * sizeof(int), esd_proto_stream_record, "stream rec" },
183     { ESD_NAME_MAX + 2 * sizeof(int), esd_proto_stream_record, "stream mon" },
184
185     { ESD_NAME_MAX + 3 * sizeof(int), esd_proto_sample_cache, "sample cache" },                      /* 6 */
186     { sizeof(int),                    esd_proto_sample_free_or_play, "sample free" },
187     { sizeof(int),                    esd_proto_sample_free_or_play, "sample play" },                /* 8 */
188     { sizeof(int),                    NULL, "sample loop" },
189     { sizeof(int),                    NULL, "sample stop" },
190     { (size_t) -1,                    NULL, "TODO: sample kill" },
191
192     { ESD_KEY_LEN + sizeof(int),      esd_proto_standby_or_resume, "standby" },
193     { ESD_KEY_LEN + sizeof(int),      esd_proto_standby_or_resume, "resume" },                       /* 13 */
194
195     { ESD_NAME_MAX,                   esd_proto_sample_get_id, "sample getid" },                     /* 14 */
196     { ESD_NAME_MAX + 2 * sizeof(int), NULL, "stream filter" },
197
198     { sizeof(int),                    esd_proto_server_info, "server info" },
199     { sizeof(int),                    esd_proto_all_info, "all info" },
200     { (size_t) -1,                    NULL, "TODO: subscribe" },
201     { (size_t) -1,                    NULL, "TODO: unsubscribe" },
202
203     { 3 * sizeof(int),                esd_proto_stream_pan, "stream pan"},
204     { 3 * sizeof(int),                esd_proto_sample_pan, "sample pan" },
205
206     { sizeof(int),                    esd_proto_standby_mode, "standby mode" },
207     { 0,                              esd_proto_get_latency, "get latency" }
208 };
209
210 static void connection_unlink(connection *c) {
211     pa_assert(c);
212
213     if (!c->protocol)
214         return;
215
216     if (c->options) {
217         pa_esound_options_unref(c->options);
218         c->options = NULL;
219     }
220
221     if (c->sink_input) {
222         pa_sink_input_unlink(c->sink_input);
223         pa_sink_input_unref(c->sink_input);
224         c->sink_input = NULL;
225     }
226
227     if (c->source_output) {
228         pa_source_output_unlink(c->source_output);
229         pa_source_output_unref(c->source_output);
230         c->source_output = NULL;
231     }
232
233     if (c->client) {
234         pa_client_free(c->client);
235         c->client = NULL;
236     }
237
238     if (c->state == ESD_STREAMING_DATA)
239         c->protocol->n_player--;
240
241     if (c->io) {
242         pa_iochannel_free(c->io);
243         c->io = NULL;
244     }
245
246     if (c->defer_event) {
247         c->protocol->core->mainloop->defer_free(c->defer_event);
248         c->defer_event = NULL;
249     }
250
251     if (c->auth_timeout_event) {
252         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
253         c->auth_timeout_event = NULL;
254     }
255
256     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
257     c->protocol = NULL;
258     connection_unref(c);
259 }
260
261 static void connection_free(pa_object *obj) {
262     connection *c = CONNECTION(obj);
263     pa_assert(c);
264
265     if (c->input_memblockq)
266         pa_memblockq_free(c->input_memblockq);
267     if (c->output_memblockq)
268         pa_memblockq_free(c->output_memblockq);
269
270     if (c->playback.current_memblock)
271         pa_memblock_unref(c->playback.current_memblock);
272
273     pa_xfree(c->read_data);
274     pa_xfree(c->write_data);
275
276     if (c->scache.memchunk.memblock)
277         pa_memblock_unref(c->scache.memchunk.memblock);
278     pa_xfree(c->scache.name);
279
280     pa_xfree(c->original_name);
281     pa_xfree(c);
282 }
283
284 static void connection_write_prepare(connection *c, size_t length) {
285     size_t t;
286     pa_assert(c);
287
288     t = c->write_data_length+length;
289
290     if (c->write_data_alloc < t)
291         c->write_data = pa_xrealloc(c->write_data, c->write_data_alloc = t);
292
293     pa_assert(c->write_data);
294 }
295
296 static void connection_write(connection *c, const void *data, size_t length) {
297     size_t i;
298     pa_assert(c);
299
300     c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
301
302     connection_write_prepare(c, length);
303
304     pa_assert(c->write_data);
305
306     i = c->write_data_length;
307     c->write_data_length += length;
308
309     memcpy((uint8_t*) c->write_data + i, data, length);
310 }
311
312 static void format_esd2native(int format, pa_bool_t swap_bytes, pa_sample_spec *ss) {
313     pa_assert(ss);
314
315     ss->channels = (uint8_t) (((format & ESD_MASK_CHAN) == ESD_STEREO) ? 2 : 1);
316     if ((format & ESD_MASK_BITS) == ESD_BITS16)
317         ss->format = swap_bytes ? PA_SAMPLE_S16RE : PA_SAMPLE_S16NE;
318     else
319         ss->format = PA_SAMPLE_U8;
320 }
321
322 static int format_native2esd(pa_sample_spec *ss) {
323     int format = 0;
324
325     format = (ss->format == PA_SAMPLE_U8) ? ESD_BITS8 : ESD_BITS16;
326     format |= (ss->channels >= 2) ? ESD_STEREO : ESD_MONO;
327
328     return format;
329 }
330
331 #define CHECK_VALIDITY(expression, ...) do { \
332     if (!(expression)) { \
333         pa_log_warn(__FILE__ ": " __VA_ARGS__); \
334         return -1; \
335     } \
336 } while(0);
337
338 /*** esound commands ***/
339
340 static int esd_proto_connect(connection *c, esd_proto_t request, const void *data, size_t length) {
341     uint32_t ekey;
342     int ok;
343
344     connection_assert_ref(c);
345     pa_assert(data);
346     pa_assert(length == (ESD_KEY_LEN + sizeof(uint32_t)));
347
348     if (!c->authorized && c->options->auth_cookie) {
349         const uint8_t*key;
350
351         if ((key = pa_auth_cookie_read(c->options->auth_cookie, ESD_KEY_LEN)))
352             if (memcmp(data, key, ESD_KEY_LEN) == 0)
353                 c->authorized = TRUE;
354     }
355
356     if (!c->authorized) {
357         pa_log("Kicked client with invalid authorization key.");
358         return -1;
359     }
360
361     if (c->auth_timeout_event) {
362         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
363         c->auth_timeout_event = NULL;
364     }
365
366     data = (const char*)data + ESD_KEY_LEN;
367
368     memcpy(&ekey, data, sizeof(uint32_t));
369     if (ekey == ESD_ENDIAN_KEY)
370         c->swap_byte_order = FALSE;
371     else if (ekey == ESD_SWAP_ENDIAN_KEY)
372         c->swap_byte_order = TRUE;
373     else {
374         pa_log_warn("Client sent invalid endian key");
375         return -1;
376     }
377
378     pa_proplist_sets(c->client->proplist, "esound.byte_order", c->swap_byte_order ? "reverse" : "native");
379
380     ok = 1;
381     connection_write(c, &ok, sizeof(int));
382     return 0;
383 }
384
385 static int esd_proto_stream_play(connection *c, esd_proto_t request, const void *data, size_t length) {
386     char name[ESD_NAME_MAX], *utf8_name;
387     int32_t format, rate;
388     pa_sample_spec ss;
389     size_t l;
390     pa_sink *sink = NULL;
391     pa_sink_input_new_data sdata;
392
393     connection_assert_ref(c);
394     pa_assert(data);
395     pa_assert(length == (sizeof(int32_t)*2+ESD_NAME_MAX));
396
397     memcpy(&format, data, sizeof(int32_t));
398     format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
399     data = (const char*) data + sizeof(int32_t);
400
401     memcpy(&rate, data, sizeof(int32_t));
402     rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
403     data = (const char*) data + sizeof(int32_t);
404
405     ss.rate = (uint32_t) rate;
406     format_esd2native(format, c->swap_byte_order, &ss);
407
408     CHECK_VALIDITY(pa_sample_spec_valid(&ss), "Invalid sample specification");
409
410     if (c->options->default_sink) {
411         sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK);
412         CHECK_VALIDITY(sink, "No such sink: %s", pa_strnull(c->options->default_sink));
413     }
414
415     pa_strlcpy(name, data, sizeof(name));
416
417     utf8_name = pa_utf8_filter(name);
418     pa_client_set_name(c->client, utf8_name);
419     pa_xfree(utf8_name);
420
421     c->original_name = pa_xstrdup(name);
422
423     pa_assert(!c->sink_input && !c->input_memblockq);
424
425     pa_sink_input_new_data_init(&sdata);
426     sdata.driver = __FILE__;
427     sdata.module = c->options->module;
428     sdata.client = c->client;
429     sdata.sink = sink;
430     pa_sink_input_new_data_set_sample_spec(&sdata, &ss);
431
432     pa_sink_input_new(&c->sink_input, c->protocol->core, &sdata, 0);
433     pa_sink_input_new_data_done(&sdata);
434
435     CHECK_VALIDITY(c->sink_input, "Failed to create sink input.");
436
437     l = (size_t) ((double) pa_bytes_per_second(&ss)*PLAYBACK_BUFFER_SECONDS);
438     c->input_memblockq = pa_memblockq_new(
439             0,
440             l,
441             l,
442             pa_frame_size(&ss),
443             (size_t) -1,
444             l/PLAYBACK_BUFFER_FRAGMENTS,
445             0,
446             NULL);
447     pa_iochannel_socket_set_rcvbuf(c->io, l);
448
449     c->sink_input->parent.process_msg = sink_input_process_msg;
450     c->sink_input->pop = sink_input_pop_cb;
451     c->sink_input->process_rewind = sink_input_process_rewind_cb;
452     c->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
453     c->sink_input->kill = sink_input_kill_cb;
454     c->sink_input->userdata = c;
455
456     pa_sink_input_set_requested_latency(c->sink_input, DEFAULT_SINK_LATENCY);
457
458     c->state = ESD_STREAMING_DATA;
459
460     c->protocol->n_player++;
461
462     pa_atomic_store(&c->playback.missing, (int) pa_memblockq_missing(c->input_memblockq));
463
464     pa_sink_input_put(c->sink_input);
465
466     return 0;
467 }
468
469 static int esd_proto_stream_record(connection *c, esd_proto_t request, const void *data, size_t length) {
470     char name[ESD_NAME_MAX], *utf8_name;
471     int32_t format, rate;
472     pa_source *source = NULL;
473     pa_sample_spec ss;
474     size_t l;
475     pa_source_output_new_data sdata;
476
477     connection_assert_ref(c);
478     pa_assert(data);
479     pa_assert(length == (sizeof(int32_t)*2+ESD_NAME_MAX));
480
481     memcpy(&format, data, sizeof(int32_t));
482     format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
483     data = (const char*) data + sizeof(int32_t);
484
485     memcpy(&rate, data, sizeof(int32_t));
486     rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
487     data = (const char*) data + sizeof(int32_t);
488
489     ss.rate = (uint32_t) rate;
490     format_esd2native(format, c->swap_byte_order, &ss);
491
492     CHECK_VALIDITY(pa_sample_spec_valid(&ss), "Invalid sample specification.");
493
494     if (request == ESD_PROTO_STREAM_MON) {
495         pa_sink* sink;
496
497         sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK);
498         CHECK_VALIDITY(sink, "No such sink: %s", pa_strnull(c->options->default_sink));
499
500         source = sink->monitor_source;
501         CHECK_VALIDITY(source, "No such source.");
502     } else {
503         pa_assert(request == ESD_PROTO_STREAM_REC);
504
505         if (c->options->default_source) {
506             source = pa_namereg_get(c->protocol->core, c->options->default_source, PA_NAMEREG_SOURCE);
507             CHECK_VALIDITY(source, "No such source: %s", pa_strnull(c->options->default_source));
508         }
509     }
510
511     pa_strlcpy(name, data, sizeof(name));
512
513     utf8_name = pa_utf8_filter(name);
514     pa_client_set_name(c->client, utf8_name);
515     pa_xfree(utf8_name);
516
517     c->original_name = pa_xstrdup(name);
518
519     pa_assert(!c->output_memblockq && !c->source_output);
520
521     pa_source_output_new_data_init(&sdata);
522     sdata.driver = __FILE__;
523     sdata.module = c->options->module;
524     sdata.client = c->client;
525     sdata.source = source;
526     pa_source_output_new_data_set_sample_spec(&sdata, &ss);
527
528     pa_source_output_new(&c->source_output, c->protocol->core, &sdata, 0);
529     pa_source_output_new_data_done(&sdata);
530
531     CHECK_VALIDITY(c->source_output, "Failed to create source output.");
532
533     l = (size_t) (pa_bytes_per_second(&ss)*RECORD_BUFFER_SECONDS);
534     c->output_memblockq = pa_memblockq_new(
535             0,
536             l,
537             l,
538             pa_frame_size(&ss),
539             1,
540             0,
541             0,
542             NULL);
543     pa_iochannel_socket_set_sndbuf(c->io, l);
544
545     c->source_output->push = source_output_push_cb;
546     c->source_output->kill = source_output_kill_cb;
547     c->source_output->get_latency = source_output_get_latency_cb;
548     c->source_output->userdata = c;
549
550     pa_source_output_set_requested_latency(c->source_output, DEFAULT_SOURCE_LATENCY);
551
552     c->state = ESD_STREAMING_DATA;
553
554     c->protocol->n_player++;
555
556     pa_source_output_put(c->source_output);
557
558     return 0;
559 }
560
561 static int esd_proto_get_latency(connection *c, esd_proto_t request, const void *data, size_t length) {
562     pa_sink *sink;
563     int32_t latency;
564
565     connection_assert_ref(c);
566     pa_assert(!data);
567     pa_assert(length == 0);
568
569     if (!(sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK)))
570         latency = 0;
571     else {
572         double usec = (double) pa_sink_get_requested_latency(sink);
573         latency = (int) ((usec*44100)/1000000);
574     }
575
576     latency = PA_MAYBE_INT32_SWAP(c->swap_byte_order, latency);
577     connection_write(c, &latency, sizeof(int32_t));
578
579     return 0;
580 }
581
582 static int esd_proto_server_info(connection *c, esd_proto_t request, const void *data, size_t length) {
583     int32_t rate = 44100, format = ESD_STEREO|ESD_BITS16;
584     int32_t response;
585     pa_sink *sink;
586
587     connection_assert_ref(c);
588     pa_assert(data);
589     pa_assert(length == sizeof(int32_t));
590
591     if ((sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK))) {
592         rate = (int32_t) sink->sample_spec.rate;
593         format = format_native2esd(&sink->sample_spec);
594     }
595
596     connection_write_prepare(c, sizeof(int32_t) * 3);
597
598     response = 0;
599     connection_write(c, &response, sizeof(int32_t));
600     rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
601     connection_write(c, &rate, sizeof(int32_t));
602     format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
603     connection_write(c, &format, sizeof(int32_t));
604
605     return 0;
606 }
607
608 static int esd_proto_all_info(connection *c, esd_proto_t request, const void *data, size_t length) {
609     size_t t, k, s;
610     connection *conn;
611     uint32_t idx = PA_IDXSET_INVALID;
612     unsigned nsamples;
613     char terminator[sizeof(int32_t)*6+ESD_NAME_MAX];
614
615     connection_assert_ref(c);
616     pa_assert(data);
617     pa_assert(length == sizeof(int32_t));
618
619     if (esd_proto_server_info(c, request, data, length) < 0)
620         return -1;
621
622     k = sizeof(int32_t)*5+ESD_NAME_MAX;
623     s = sizeof(int32_t)*6+ESD_NAME_MAX;
624     nsamples = pa_idxset_size(c->protocol->core->scache);
625     t = s*(nsamples+1) + k*(c->protocol->n_player+1);
626
627     connection_write_prepare(c, t);
628
629     memset(terminator, 0, sizeof(terminator));
630
631     for (conn = pa_idxset_first(c->protocol->connections, &idx); conn; conn = pa_idxset_next(c->protocol->connections, &idx)) {
632         int32_t id, format = ESD_BITS16 | ESD_STEREO, rate = 44100, lvolume = ESD_VOLUME_BASE, rvolume = ESD_VOLUME_BASE;
633         char name[ESD_NAME_MAX];
634
635         if (conn->state != ESD_STREAMING_DATA)
636             continue;
637
638         pa_assert(t >= k*2+s);
639
640         if (conn->sink_input) {
641             pa_cvolume volume;
642             pa_sink_input_get_volume(conn->sink_input, &volume, TRUE);
643             rate = (int32_t) conn->sink_input->sample_spec.rate;
644             lvolume = (int32_t) ((volume.values[0]*ESD_VOLUME_BASE)/PA_VOLUME_NORM);
645             rvolume = (int32_t) ((volume.values[volume.channels == 2 ? 1 : 0]*ESD_VOLUME_BASE)/PA_VOLUME_NORM);
646             format = format_native2esd(&conn->sink_input->sample_spec);
647         }
648
649         /* id */
650         id = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int32_t) (conn->index+1));
651         connection_write(c, &id, sizeof(int32_t));
652
653         /* name */
654         memset(name, 0, ESD_NAME_MAX); /* don't leak old data */
655         if (conn->original_name)
656             strncpy(name, conn->original_name, ESD_NAME_MAX);
657         else if (conn->client && pa_proplist_gets(conn->client->proplist, PA_PROP_APPLICATION_NAME))
658             strncpy(name, pa_proplist_gets(conn->client->proplist, PA_PROP_APPLICATION_NAME), ESD_NAME_MAX);
659         connection_write(c, name, ESD_NAME_MAX);
660
661         /* rate */
662         rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
663         connection_write(c, &rate, sizeof(int32_t));
664
665         /* left */
666         lvolume = PA_MAYBE_INT32_SWAP(c->swap_byte_order, lvolume);
667         connection_write(c, &lvolume, sizeof(int32_t));
668
669         /*right*/
670         rvolume = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rvolume);
671         connection_write(c, &rvolume, sizeof(int32_t));
672
673         /*format*/
674         format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
675         connection_write(c, &format, sizeof(int32_t));
676
677         t -= k;
678     }
679
680     pa_assert(t == s*(nsamples+1)+k);
681     t -= k;
682
683     connection_write(c, terminator, k);
684
685     if (nsamples) {
686         pa_scache_entry *ce;
687
688         idx = PA_IDXSET_INVALID;
689         for (ce = pa_idxset_first(c->protocol->core->scache, &idx); ce; ce = pa_idxset_next(c->protocol->core->scache, &idx)) {
690             int32_t id, rate, lvolume, rvolume, format, len;
691             char name[ESD_NAME_MAX];
692             pa_channel_map stereo = { .channels = 2, .map = { PA_CHANNEL_POSITION_LEFT, PA_CHANNEL_POSITION_RIGHT } };
693             pa_cvolume volume;
694             pa_sample_spec ss;
695
696             pa_assert(t >= s*2);
697
698             if (ce->volume_is_set) {
699                 volume = ce->volume;
700                 pa_cvolume_remap(&volume, &ce->channel_map, &stereo);
701             } else
702                 pa_cvolume_reset(&volume, 2);
703
704             if (ce->memchunk.memblock)
705                 ss = ce->sample_spec;
706             else {
707                 ss.format = PA_SAMPLE_S16NE;
708                 ss.rate = 44100;
709                 ss.channels = 2;
710             }
711
712             /* id */
713             id = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int) (ce->index+1));
714             connection_write(c, &id, sizeof(int32_t));
715
716             /* name */
717             memset(name, 0, ESD_NAME_MAX); /* don't leak old data */
718             if (strncmp(ce->name, SCACHE_PREFIX, sizeof(SCACHE_PREFIX)-1) == 0)
719                 strncpy(name, ce->name+sizeof(SCACHE_PREFIX)-1, ESD_NAME_MAX);
720             else
721                 pa_snprintf(name, ESD_NAME_MAX, "native.%s", ce->name);
722             connection_write(c, name, ESD_NAME_MAX);
723
724             /* rate */
725             rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int32_t) ss.rate);
726             connection_write(c, &rate, sizeof(int32_t));
727
728             /* left */
729             lvolume = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int32_t) ((volume.values[0]*ESD_VOLUME_BASE)/PA_VOLUME_NORM));
730             connection_write(c, &lvolume, sizeof(int32_t));
731
732             /*right*/
733             rvolume = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int32_t) ((volume.values[1]*ESD_VOLUME_BASE)/PA_VOLUME_NORM));
734             connection_write(c, &rvolume, sizeof(int32_t));
735
736             /*format*/
737             format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format_native2esd(&ss));
738             connection_write(c, &format, sizeof(int32_t));
739
740             /*length*/
741             len = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int) ce->memchunk.length);
742             connection_write(c, &len, sizeof(int32_t));
743
744             t -= s;
745         }
746     }
747
748     pa_assert(t == s);
749
750     connection_write(c, terminator, s);
751
752     return 0;
753 }
754
755 static int esd_proto_stream_pan(connection *c, esd_proto_t request, const void *data, size_t length) {
756     int32_t ok;
757     uint32_t idx, lvolume, rvolume;
758     connection *conn;
759
760     connection_assert_ref(c);
761     pa_assert(data);
762     pa_assert(length == sizeof(int32_t)*3);
763
764     memcpy(&idx, data, sizeof(uint32_t));
765     idx = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1;
766     data = (const char*)data + sizeof(uint32_t);
767
768     memcpy(&lvolume, data, sizeof(uint32_t));
769     lvolume = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, lvolume);
770     data = (const char*)data + sizeof(uint32_t);
771
772     memcpy(&rvolume, data, sizeof(uint32_t));
773     rvolume = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, rvolume);
774     data = (const char*)data + sizeof(uint32_t);
775
776     if ((conn = pa_idxset_get_by_index(c->protocol->connections, idx)) && conn->sink_input) {
777         pa_cvolume volume;
778         volume.values[0] = (lvolume*PA_VOLUME_NORM)/ESD_VOLUME_BASE;
779         volume.values[1] = (rvolume*PA_VOLUME_NORM)/ESD_VOLUME_BASE;
780         volume.channels = conn->sink_input->sample_spec.channels;
781
782         pa_sink_input_set_volume(conn->sink_input, &volume, TRUE, TRUE);
783         ok = 1;
784     } else
785         ok = 0;
786
787     connection_write(c, &ok, sizeof(int32_t));
788
789     return 0;
790 }
791
792 static int esd_proto_sample_pan(connection *c, esd_proto_t request, const void *data, size_t length) {
793     int32_t ok = 0;
794     uint32_t idx, lvolume, rvolume;
795     pa_cvolume volume;
796     pa_scache_entry *ce;
797
798     connection_assert_ref(c);
799     pa_assert(data);
800     pa_assert(length == sizeof(int32_t)*3);
801
802     memcpy(&idx, data, sizeof(uint32_t));
803     idx = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1;
804     data = (const char*)data + sizeof(uint32_t);
805
806     memcpy(&lvolume, data, sizeof(uint32_t));
807     lvolume = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, lvolume);
808     data = (const char*)data + sizeof(uint32_t);
809
810     memcpy(&rvolume, data, sizeof(uint32_t));
811     rvolume = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, rvolume);
812     data = (const char*)data + sizeof(uint32_t);
813
814     volume.values[0] = (lvolume*PA_VOLUME_NORM)/ESD_VOLUME_BASE;
815     volume.values[1] = (rvolume*PA_VOLUME_NORM)/ESD_VOLUME_BASE;
816     volume.channels = 2;
817
818     if ((ce = pa_idxset_get_by_index(c->protocol->core->scache, idx))) {
819         pa_channel_map stereo = { .channels = 2, .map = { PA_CHANNEL_POSITION_LEFT, PA_CHANNEL_POSITION_RIGHT } };
820
821         pa_cvolume_remap(&volume, &stereo, &ce->channel_map);
822         ce->volume = volume;
823         ce->volume_is_set = TRUE;
824         ok = 1;
825     }
826
827     connection_write(c, &ok, sizeof(int32_t));
828
829     return 0;
830 }
831
832 static int esd_proto_sample_cache(connection *c, esd_proto_t request, const void *data, size_t length) {
833     pa_sample_spec ss;
834     int32_t format, rate, sc_length;
835     uint32_t idx;
836     char name[ESD_NAME_MAX+sizeof(SCACHE_PREFIX)-1];
837
838     connection_assert_ref(c);
839     pa_assert(data);
840     pa_assert(length == (ESD_NAME_MAX+3*sizeof(int32_t)));
841
842     memcpy(&format, data, sizeof(int32_t));
843     format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
844     data = (const char*)data + sizeof(int32_t);
845
846     memcpy(&rate, data, sizeof(int32_t));
847     rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
848     data = (const char*)data + sizeof(int32_t);
849
850     ss.rate = (uint32_t) rate;
851     format_esd2native(format, c->swap_byte_order, &ss);
852
853     CHECK_VALIDITY(pa_sample_spec_valid(&ss), "Invalid sample specification.");
854
855     memcpy(&sc_length, data, sizeof(int32_t));
856     sc_length = PA_MAYBE_INT32_SWAP(c->swap_byte_order, sc_length);
857     data = (const char*)data + sizeof(int32_t);
858
859     CHECK_VALIDITY(sc_length <= MAX_CACHE_SAMPLE_SIZE, "Sample too large (%d bytes).", (int)sc_length);
860
861     strcpy(name, SCACHE_PREFIX);
862     pa_strlcpy(name+sizeof(SCACHE_PREFIX)-1, data, ESD_NAME_MAX);
863
864     CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name.");
865
866     pa_assert(!c->scache.memchunk.memblock);
867     c->scache.memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, (size_t) sc_length);
868     c->scache.memchunk.index = 0;
869     c->scache.memchunk.length = (size_t) sc_length;
870     c->scache.sample_spec = ss;
871     pa_assert(!c->scache.name);
872     c->scache.name = pa_xstrdup(name);
873
874     c->state = ESD_CACHING_SAMPLE;
875
876     pa_scache_add_item(c->protocol->core, c->scache.name, NULL, NULL, NULL, c->client->proplist, &idx);
877
878     idx += 1;
879     connection_write(c, &idx, sizeof(uint32_t));
880
881     return 0;
882 }
883
884 static int esd_proto_sample_get_id(connection *c, esd_proto_t request, const void *data, size_t length) {
885     int32_t ok;
886     uint32_t idx;
887     char name[ESD_NAME_MAX+sizeof(SCACHE_PREFIX)-1];
888
889     connection_assert_ref(c);
890     pa_assert(data);
891     pa_assert(length == ESD_NAME_MAX);
892
893     strcpy(name, SCACHE_PREFIX);
894     pa_strlcpy(name+sizeof(SCACHE_PREFIX)-1, data, ESD_NAME_MAX);
895
896     CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name.");
897
898     ok = -1;
899     if ((idx = pa_scache_get_id_by_name(c->protocol->core, name)) != PA_IDXSET_INVALID)
900         ok = (int32_t) idx + 1;
901
902     connection_write(c, &ok, sizeof(int32_t));
903
904     return 0;
905 }
906
907 static int esd_proto_sample_free_or_play(connection *c, esd_proto_t request, const void *data, size_t length) {
908     int32_t ok;
909     const char *name;
910     uint32_t idx;
911
912     connection_assert_ref(c);
913     pa_assert(data);
914     pa_assert(length == sizeof(int32_t));
915
916     memcpy(&idx, data, sizeof(uint32_t));
917     idx = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1;
918
919     ok = 0;
920
921     if ((name = pa_scache_get_name_by_id(c->protocol->core, idx))) {
922         if (request == ESD_PROTO_SAMPLE_PLAY) {
923             pa_sink *sink;
924
925             if ((sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK)))
926                 if (pa_scache_play_item(c->protocol->core, name, sink, PA_VOLUME_NORM, c->client->proplist, NULL) >= 0)
927                     ok = (int32_t) idx + 1;
928         } else {
929             pa_assert(request == ESD_PROTO_SAMPLE_FREE);
930
931             if (pa_scache_remove_item(c->protocol->core, name) >= 0)
932                 ok = (int32_t) idx + 1;
933         }
934     }
935
936     connection_write(c, &ok, sizeof(int32_t));
937
938     return 0;
939 }
940
941 static int esd_proto_standby_or_resume(connection *c, esd_proto_t request, const void *data, size_t length) {
942     int32_t ok = 1;
943
944     connection_assert_ref(c);
945
946     connection_write_prepare(c, sizeof(int32_t) * 2);
947     connection_write(c, &ok, sizeof(int32_t));
948
949     if (request == ESD_PROTO_STANDBY)
950         ok = pa_sink_suspend_all(c->protocol->core, TRUE, PA_SUSPEND_USER) >= 0;
951     else {
952         pa_assert(request == ESD_PROTO_RESUME);
953         ok = pa_sink_suspend_all(c->protocol->core, FALSE, PA_SUSPEND_USER) >= 0;
954     }
955
956     connection_write(c, &ok, sizeof(int32_t));
957
958     return 0;
959 }
960
961 static int esd_proto_standby_mode(connection *c, esd_proto_t request, const void *data, size_t length) {
962     int32_t mode;
963     pa_sink *sink, *source;
964
965     connection_assert_ref(c);
966
967     mode = ESM_RUNNING;
968
969     if ((sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK)))
970         if (pa_sink_get_state(sink) == PA_SINK_SUSPENDED)
971             mode = ESM_ON_STANDBY;
972
973     if ((source = pa_namereg_get(c->protocol->core, c->options->default_source, PA_NAMEREG_SOURCE)))
974         if (pa_source_get_state(source) == PA_SOURCE_SUSPENDED)
975             mode = ESM_ON_STANDBY;
976
977     mode = PA_MAYBE_INT32_SWAP(c->swap_byte_order, mode);
978
979     connection_write(c, &mode, sizeof(mode));
980     return 0;
981 }
982
983 /*** client callbacks ***/
984
985 static void client_kill_cb(pa_client *c) {
986     pa_assert(c);
987
988     connection_unlink(CONNECTION(c->userdata));
989 }
990
991 /*** pa_iochannel callbacks ***/
992
993 static int do_read(connection *c) {
994     connection_assert_ref(c);
995
996 /*     pa_log("READ"); */
997
998     if (c->state == ESD_NEXT_REQUEST) {
999         ssize_t r;
1000         pa_assert(c->read_data_length < sizeof(c->request));
1001
1002         if ((r = pa_iochannel_read(c->io,
1003                                    ((uint8_t*) &c->request) + c->read_data_length,
1004                                    sizeof(c->request) - c->read_data_length)) <= 0) {
1005
1006             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1007                 return 0;
1008
1009             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
1010             return -1;
1011         }
1012
1013         c->read_data_length += (size_t) r;
1014
1015         if (c->read_data_length >= sizeof(c->request)) {
1016             struct proto_handler *handler;
1017
1018             c->request = PA_MAYBE_INT32_SWAP(c->swap_byte_order, c->request);
1019
1020             if (c->request < ESD_PROTO_CONNECT || c->request >= ESD_PROTO_MAX) {
1021                 pa_log("recieved invalid request.");
1022                 return -1;
1023             }
1024
1025             handler = proto_map+c->request;
1026
1027 /*             pa_log("executing request #%u", c->request); */
1028
1029             if (!handler->proc) {
1030                 pa_log("recieved unimplemented request #%u.", c->request);
1031                 return -1;
1032             }
1033
1034             if (handler->data_length == 0) {
1035                 c->read_data_length = 0;
1036
1037                 if (handler->proc(c, c->request, NULL, 0) < 0)
1038                     return -1;
1039
1040             } else {
1041                 if (c->read_data_alloc < handler->data_length)
1042                     c->read_data = pa_xrealloc(c->read_data, c->read_data_alloc = handler->data_length);
1043                 pa_assert(c->read_data);
1044
1045                 c->state = ESD_NEEDS_REQDATA;
1046                 c->read_data_length = 0;
1047             }
1048         }
1049
1050     } else if (c->state == ESD_NEEDS_REQDATA) {
1051         ssize_t r;
1052         struct proto_handler *handler = proto_map+c->request;
1053
1054         pa_assert(handler->proc);
1055
1056         pa_assert(c->read_data && c->read_data_length < handler->data_length);
1057
1058         if ((r = pa_iochannel_read(c->io,
1059                                    (uint8_t*) c->read_data + c->read_data_length,
1060                                    handler->data_length - c->read_data_length)) <= 0) {
1061
1062             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1063                 return 0;
1064
1065             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
1066             return -1;
1067         }
1068
1069         c->read_data_length += (size_t) r;
1070         if (c->read_data_length >= handler->data_length) {
1071             size_t l = c->read_data_length;
1072             pa_assert(handler->proc);
1073
1074             c->state = ESD_NEXT_REQUEST;
1075             c->read_data_length = 0;
1076
1077             if (handler->proc(c, c->request, c->read_data, l) < 0)
1078                 return -1;
1079         }
1080     } else if (c->state == ESD_CACHING_SAMPLE) {
1081         ssize_t r;
1082         void *p;
1083
1084         pa_assert(c->scache.memchunk.memblock);
1085         pa_assert(c->scache.name);
1086         pa_assert(c->scache.memchunk.index < c->scache.memchunk.length);
1087
1088         p = pa_memblock_acquire(c->scache.memchunk.memblock);
1089         r = pa_iochannel_read(c->io, (uint8_t*) p+c->scache.memchunk.index, c->scache.memchunk.length-c->scache.memchunk.index);
1090         pa_memblock_release(c->scache.memchunk.memblock);
1091
1092         if (r <= 0) {
1093             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1094                 return 0;
1095
1096             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
1097             return -1;
1098         }
1099
1100         c->scache.memchunk.index += (size_t) r;
1101         pa_assert(c->scache.memchunk.index <= c->scache.memchunk.length);
1102
1103         if (c->scache.memchunk.index == c->scache.memchunk.length) {
1104             uint32_t idx;
1105
1106             c->scache.memchunk.index = 0;
1107             pa_scache_add_item(c->protocol->core, c->scache.name, &c->scache.sample_spec, NULL, &c->scache.memchunk, c->client->proplist, &idx);
1108
1109             pa_memblock_unref(c->scache.memchunk.memblock);
1110             pa_memchunk_reset(&c->scache.memchunk);
1111
1112             pa_xfree(c->scache.name);
1113             c->scache.name = NULL;
1114
1115             c->state = ESD_NEXT_REQUEST;
1116
1117             idx += 1;
1118             connection_write(c, &idx, sizeof(uint32_t));
1119         }
1120
1121     } else if (c->state == ESD_STREAMING_DATA && c->sink_input) {
1122         pa_memchunk chunk;
1123         ssize_t r;
1124         size_t l;
1125         void *p;
1126         size_t space;
1127
1128         pa_assert(c->input_memblockq);
1129
1130 /*         pa_log("STREAMING_DATA"); */
1131
1132         if (!(l = (size_t) pa_atomic_load(&c->playback.missing)))
1133             return 0;
1134
1135         if (c->playback.current_memblock) {
1136
1137             space = pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index;
1138
1139             if (space <= 0) {
1140                 pa_memblock_unref(c->playback.current_memblock);
1141                 c->playback.current_memblock = NULL;
1142             }
1143         }
1144
1145         if (!c->playback.current_memblock) {
1146             pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, (size_t) -1));
1147             c->playback.memblock_index = 0;
1148
1149             space = pa_memblock_get_length(c->playback.current_memblock);
1150         }
1151
1152         if (l > space)
1153             l = space;
1154
1155         p = pa_memblock_acquire(c->playback.current_memblock);
1156         r = pa_iochannel_read(c->io, (uint8_t*) p+c->playback.memblock_index, l);
1157         pa_memblock_release(c->playback.current_memblock);
1158
1159         if (r <= 0) {
1160
1161             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1162                 return 0;
1163
1164             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
1165             return -1;
1166         }
1167
1168         chunk.memblock = c->playback.current_memblock;
1169         chunk.index = c->playback.memblock_index;
1170         chunk.length = (size_t) r;
1171
1172         c->playback.memblock_index += (size_t) r;
1173
1174         pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL);
1175         pa_atomic_sub(&c->playback.missing, (int) r);
1176     }
1177
1178     return 0;
1179 }
1180
1181 static int do_write(connection *c) {
1182     connection_assert_ref(c);
1183
1184 /*     pa_log("WRITE"); */
1185
1186     if (c->write_data_length) {
1187         ssize_t r;
1188
1189         pa_assert(c->write_data_index < c->write_data_length);
1190         if ((r = pa_iochannel_write(c->io, (uint8_t*) c->write_data+c->write_data_index, c->write_data_length-c->write_data_index)) < 0) {
1191
1192             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1193                 return 0;
1194
1195             pa_log("write(): %s", pa_cstrerror(errno));
1196             return -1;
1197         }
1198
1199         c->write_data_index += (size_t) r;
1200         if (c->write_data_index >= c->write_data_length)
1201             c->write_data_length = c->write_data_index = 0;
1202
1203     } else if (c->state == ESD_STREAMING_DATA && c->source_output) {
1204         pa_memchunk chunk;
1205         ssize_t r;
1206         void *p;
1207
1208         if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)
1209             return 0;
1210
1211         pa_assert(chunk.memblock);
1212         pa_assert(chunk.length);
1213
1214         p = pa_memblock_acquire(chunk.memblock);
1215         r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length);
1216         pa_memblock_release(chunk.memblock);
1217
1218         pa_memblock_unref(chunk.memblock);
1219
1220         if (r < 0) {
1221
1222             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1223                 return 0;
1224
1225             pa_log("write(): %s", pa_cstrerror(errno));
1226             return -1;
1227         }
1228
1229         pa_memblockq_drop(c->output_memblockq, (size_t) r);
1230     }
1231
1232     return 0;
1233 }
1234
1235 static void do_work(connection *c) {
1236     connection_assert_ref(c);
1237
1238     c->protocol->core->mainloop->defer_enable(c->defer_event, 0);
1239
1240     if (c->dead)
1241         return;
1242
1243     if (pa_iochannel_is_readable(c->io))
1244         if (do_read(c) < 0)
1245             goto fail;
1246
1247     if (c->state == ESD_STREAMING_DATA && !c->sink_input && pa_iochannel_is_hungup(c->io))
1248         /* In case we are in capture mode we will never call read()
1249          * on the socket, hence we need to detect the hangup manually
1250          * here, instead of simply waiting for read() to return 0. */
1251         goto fail;
1252
1253     if (pa_iochannel_is_writable(c->io))
1254         if (do_write(c) < 0)
1255             goto fail;
1256
1257     return;
1258
1259 fail:
1260
1261     if (c->state == ESD_STREAMING_DATA && c->sink_input) {
1262         c->dead = TRUE;
1263
1264         pa_iochannel_free(c->io);
1265         c->io = NULL;
1266
1267         pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL);
1268     } else
1269         connection_unlink(c);
1270 }
1271
1272 static void io_callback(pa_iochannel*io, void *userdata) {
1273     connection *c = CONNECTION(userdata);
1274
1275     connection_assert_ref(c);
1276     pa_assert(io);
1277
1278     do_work(c);
1279 }
1280
1281 static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *userdata) {
1282     connection *c = CONNECTION(userdata);
1283
1284     connection_assert_ref(c);
1285     pa_assert(e);
1286
1287     do_work(c);
1288 }
1289
1290 static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1291     connection *c = CONNECTION(o);
1292     connection_assert_ref(c);
1293
1294     if (!c->protocol)
1295         return -1;
1296
1297     switch (code) {
1298         case CONNECTION_MESSAGE_REQUEST_DATA:
1299             do_work(c);
1300             break;
1301
1302         case CONNECTION_MESSAGE_POST_DATA:
1303 /*             pa_log("got data %u", chunk->length); */
1304             pa_memblockq_push_align(c->output_memblockq, chunk);
1305             do_work(c);
1306             break;
1307
1308         case CONNECTION_MESSAGE_UNLINK_CONNECTION:
1309             connection_unlink(c);
1310             break;
1311     }
1312
1313     return 0;
1314 }
1315
1316 /*** sink_input callbacks ***/
1317
1318 /* Called from thread context */
1319 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1320     pa_sink_input *i = PA_SINK_INPUT(o);
1321     connection*c;
1322
1323     pa_sink_input_assert_ref(i);
1324     c = CONNECTION(i->userdata);
1325     connection_assert_ref(c);
1326
1327     switch (code) {
1328
1329         case SINK_INPUT_MESSAGE_POST_DATA: {
1330             pa_assert(chunk);
1331
1332             /* New data from the main loop */
1333             pa_memblockq_push_align(c->input_memblockq, chunk);
1334
1335             if (pa_memblockq_is_readable(c->input_memblockq) && c->playback.underrun) {
1336                 pa_log_debug("Requesting rewind due to end of underrun.");
1337                 pa_sink_input_request_rewind(c->sink_input, 0, FALSE, TRUE, FALSE);
1338             }
1339
1340 /*             pa_log("got data, %u", pa_memblockq_get_length(c->input_memblockq)); */
1341
1342             return 0;
1343         }
1344
1345         case SINK_INPUT_MESSAGE_DISABLE_PREBUF:
1346             pa_memblockq_prebuf_disable(c->input_memblockq);
1347             return 0;
1348
1349         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1350             pa_usec_t *r = userdata;
1351
1352             *r = pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
1353
1354             /* Fall through, the default handler will add in the extra
1355              * latency added by the resampler */
1356         }
1357
1358         default:
1359             return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1360     }
1361 }
1362
1363 /* Called from thread context */
1364 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
1365     connection*c;
1366
1367     pa_sink_input_assert_ref(i);
1368     c = CONNECTION(i->userdata);
1369     connection_assert_ref(c);
1370     pa_assert(chunk);
1371
1372     if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) {
1373
1374         c->playback.underrun = TRUE;
1375
1376         if (c->dead && pa_sink_input_safe_to_remove(i))
1377             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL);
1378
1379         return -1;
1380     } else {
1381         size_t m;
1382
1383         chunk->length = PA_MIN(length, chunk->length);
1384
1385         c->playback.underrun = FALSE;
1386
1387         pa_memblockq_drop(c->input_memblockq, chunk->length);
1388         m = pa_memblockq_pop_missing(c->input_memblockq);
1389
1390         if (m > 0)
1391             if (pa_atomic_add(&c->playback.missing, (int) m) <= 0)
1392                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1393
1394         return 0;
1395     }
1396 }
1397
1398 /* Called from thread context */
1399 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1400     connection *c;
1401
1402     pa_sink_input_assert_ref(i);
1403     c = CONNECTION(i->userdata);
1404     connection_assert_ref(c);
1405
1406     /* If we are in an underrun, then we don't rewind */
1407     if (i->thread_info.underrun_for > 0)
1408         return;
1409
1410     pa_memblockq_rewind(c->input_memblockq, nbytes);
1411 }
1412
1413 /* Called from thread context */
1414 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1415     connection *c;
1416
1417     pa_sink_input_assert_ref(i);
1418     c = CONNECTION(i->userdata);
1419     connection_assert_ref(c);
1420
1421     pa_memblockq_set_maxrewind(c->input_memblockq, nbytes);
1422 }
1423
1424 static void sink_input_kill_cb(pa_sink_input *i) {
1425     pa_sink_input_assert_ref(i);
1426
1427     connection_unlink(CONNECTION(i->userdata));
1428 }
1429
1430 /*** source_output callbacks ***/
1431
1432 /* Called from thread context */
1433 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1434     connection *c;
1435
1436     pa_source_output_assert_ref(o);
1437     c = CONNECTION(o->userdata);
1438     pa_assert(c);
1439     pa_assert(chunk);
1440
1441     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1442 }
1443
1444 static void source_output_kill_cb(pa_source_output *o) {
1445     pa_source_output_assert_ref(o);
1446
1447     connection_unlink(CONNECTION(o->userdata));
1448 }
1449
1450 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1451     connection*c;
1452
1453     pa_source_output_assert_ref(o);
1454     c = CONNECTION(o->userdata);
1455     pa_assert(c);
1456
1457     return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
1458 }
1459
1460 /*** entry points ***/
1461
1462 static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) {
1463     connection *c = CONNECTION(userdata);
1464
1465     pa_assert(m);
1466     pa_assert(tv);
1467     connection_assert_ref(c);
1468     pa_assert(c->auth_timeout_event == e);
1469
1470     if (!c->authorized)
1471         connection_unlink(c);
1472 }
1473
1474 void pa_esound_protocol_connect(pa_esound_protocol *p, pa_iochannel *io, pa_esound_options *o) {
1475     connection *c;
1476     char pname[128];
1477     pa_client_new_data data;
1478     pa_client *client;
1479
1480     pa_assert(p);
1481     pa_assert(io);
1482     pa_assert(o);
1483
1484     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
1485         pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
1486         pa_iochannel_free(io);
1487         return;
1488     }
1489
1490     pa_client_new_data_init(&data);
1491     data.module = o->module;
1492     data.driver = __FILE__;
1493     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
1494     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "EsounD client (%s)", pname);
1495     pa_proplist_sets(data.proplist, "esound-protocol.peer", pname);
1496     client = pa_client_new(p->core, &data);
1497     pa_client_new_data_done(&data);
1498
1499     if (!client)
1500         return;
1501
1502     c = pa_msgobject_new(connection);
1503     c->parent.parent.free = connection_free;
1504     c->parent.process_msg = connection_process_msg;
1505     c->protocol = p;
1506     c->io = io;
1507     pa_iochannel_set_callback(c->io, io_callback, c);
1508
1509     c->client = client;
1510     c->client->kill = client_kill_cb;
1511     c->client->userdata = c;
1512
1513     c->options = pa_esound_options_ref(o);
1514     c->authorized = FALSE;
1515     c->swap_byte_order = FALSE;
1516     c->dead = FALSE;
1517
1518     c->read_data_length = 0;
1519     c->read_data = pa_xmalloc(c->read_data_alloc = proto_map[ESD_PROTO_CONNECT].data_length);
1520
1521     c->write_data_length = c->write_data_index = c->write_data_alloc = 0;
1522     c->write_data = NULL;
1523
1524     c->state = ESD_NEEDS_REQDATA;
1525     c->request = ESD_PROTO_CONNECT;
1526
1527     c->sink_input = NULL;
1528     c->input_memblockq = NULL;
1529
1530     c->source_output = NULL;
1531     c->output_memblockq = NULL;
1532
1533     c->playback.current_memblock = NULL;
1534     c->playback.memblock_index = 0;
1535     c->playback.underrun = TRUE;
1536     pa_atomic_store(&c->playback.missing, 0);
1537
1538     pa_memchunk_reset(&c->scache.memchunk);
1539     c->scache.name = NULL;
1540
1541     c->original_name = NULL;
1542
1543     if (o->auth_anonymous) {
1544         pa_log_info("Client authenticated anonymously.");
1545         c->authorized = TRUE;
1546     }
1547
1548     if (!c->authorized &&
1549         o->auth_ip_acl &&
1550         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
1551
1552         pa_log_info("Client authenticated by IP ACL.");
1553         c->authorized = TRUE;
1554     }
1555
1556     if (!c->authorized) {
1557         struct timeval tv;
1558         pa_gettimeofday(&tv);
1559         tv.tv_sec += AUTH_TIMEOUT;
1560         c->auth_timeout_event = p->core->mainloop->time_new(p->core->mainloop, &tv, auth_timeout, c);
1561     } else
1562         c->auth_timeout_event = NULL;
1563
1564     c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c);
1565     p->core->mainloop->defer_enable(c->defer_event, 0);
1566
1567     pa_idxset_put(p->connections, c, &c->index);
1568 }
1569
1570 void pa_esound_protocol_disconnect(pa_esound_protocol *p, pa_module *m) {
1571     connection *c;
1572     void *state = NULL;
1573
1574     pa_assert(p);
1575     pa_assert(m);
1576
1577     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
1578         if (c->options->module == m)
1579             connection_unlink(c);
1580 }
1581
1582 static pa_esound_protocol* esound_protocol_new(pa_core *c) {
1583     pa_esound_protocol *p;
1584
1585     pa_assert(c);
1586
1587     p = pa_xnew(pa_esound_protocol, 1);
1588     PA_REFCNT_INIT(p);
1589     p->core = c;
1590     p->connections = pa_idxset_new(NULL, NULL);
1591     p->n_player = 0;
1592
1593     pa_assert_se(pa_shared_set(c, "esound-protocol", p) >= 0);
1594
1595     return p;
1596 }
1597
1598 pa_esound_protocol* pa_esound_protocol_get(pa_core *c) {
1599     pa_esound_protocol *p;
1600
1601     if ((p = pa_shared_get(c, "esound-protocol")))
1602         return pa_esound_protocol_ref(p);
1603
1604     return esound_protocol_new(c);
1605 }
1606
1607 pa_esound_protocol* pa_esound_protocol_ref(pa_esound_protocol *p) {
1608     pa_assert(p);
1609     pa_assert(PA_REFCNT_VALUE(p) >= 1);
1610
1611     PA_REFCNT_INC(p);
1612
1613     return p;
1614 }
1615
1616 void pa_esound_protocol_unref(pa_esound_protocol *p) {
1617     connection *c;
1618     pa_assert(p);
1619     pa_assert(PA_REFCNT_VALUE(p) >= 1);
1620
1621     if (PA_REFCNT_DEC(p) > 0)
1622         return;
1623
1624     while ((c = pa_idxset_first(p->connections, NULL)))
1625         connection_unlink(c);
1626
1627     pa_idxset_free(p->connections, NULL, NULL);
1628
1629     pa_assert_se(pa_shared_remove(p->core, "esound-protocol") >= 0);
1630
1631     pa_xfree(p);
1632 }
1633
1634 pa_esound_options* pa_esound_options_new(void) {
1635     pa_esound_options *o;
1636
1637     o = pa_xnew0(pa_esound_options, 1);
1638     PA_REFCNT_INIT(o);
1639
1640     return o;
1641 }
1642
1643 pa_esound_options* pa_esound_options_ref(pa_esound_options *o) {
1644     pa_assert(o);
1645     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1646
1647     PA_REFCNT_INC(o);
1648
1649     return o;
1650 }
1651
1652 void pa_esound_options_unref(pa_esound_options *o) {
1653     pa_assert(o);
1654     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1655
1656     if (PA_REFCNT_DEC(o) > 0)
1657         return;
1658
1659     if (o->auth_ip_acl)
1660         pa_ip_acl_free(o->auth_ip_acl);
1661
1662     if (o->auth_cookie)
1663         pa_auth_cookie_unref(o->auth_cookie);
1664
1665     pa_xfree(o->default_sink);
1666     pa_xfree(o->default_source);
1667
1668     pa_xfree(o);
1669 }
1670
1671 int pa_esound_options_parse(pa_esound_options *o, pa_core *c, pa_modargs *ma) {
1672     pa_bool_t enabled;
1673     const char *acl;
1674
1675     pa_assert(o);
1676     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1677     pa_assert(ma);
1678
1679     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
1680         pa_log("auth-anonymous= expects a boolean argument.");
1681         return -1;
1682     }
1683
1684     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
1685         pa_ip_acl *ipa;
1686
1687         if (!(ipa = pa_ip_acl_new(acl))) {
1688             pa_log("Failed to parse IP ACL '%s'", acl);
1689             return -1;
1690         }
1691
1692         if (o->auth_ip_acl)
1693             pa_ip_acl_free(o->auth_ip_acl);
1694
1695         o->auth_ip_acl = ipa;
1696     }
1697
1698     enabled = TRUE;
1699     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
1700         pa_log("auth-cookie-enabled= expects a boolean argument.");
1701         return -1;
1702     }
1703
1704     if (o->auth_cookie)
1705         pa_auth_cookie_unref(o->auth_cookie);
1706
1707     if (enabled) {
1708         const char *cn;
1709
1710         /* The new name for this is 'auth-cookie', for compat reasons
1711          * we check the old name too */
1712         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
1713             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
1714                 cn = DEFAULT_COOKIE_FILE;
1715
1716         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, ESD_KEY_LEN)))
1717             return -1;
1718
1719     } else
1720         o->auth_cookie = NULL;
1721
1722     pa_xfree(o->default_sink);
1723     o->default_sink = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
1724
1725     pa_xfree(o->default_source);
1726     o->default_source = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
1727
1728     return 0;
1729 }