Merge branch 'master' of ssh://rootserver/home/lennart/git/public/pulseaudio
[platform/upstream/pulseaudio.git] / src / pulsecore / protocol-esound.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <errno.h>
28 #include <string.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <limits.h>
32
33 #include <pulse/rtclock.h>
34 #include <pulse/sample.h>
35 #include <pulse/timeval.h>
36 #include <pulse/utf8.h>
37 #include <pulse/xmalloc.h>
38 #include <pulse/proplist.h>
39
40 #include <pulsecore/esound.h>
41 #include <pulsecore/memblock.h>
42 #include <pulsecore/client.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/sink.h>
45 #include <pulsecore/source-output.h>
46 #include <pulsecore/source.h>
47 #include <pulsecore/core-scache.h>
48 #include <pulsecore/sample-util.h>
49 #include <pulsecore/authkey.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/core-util.h>
53 #include <pulsecore/core-error.h>
54 #include <pulsecore/ipacl.h>
55 #include <pulsecore/macro.h>
56 #include <pulsecore/thread-mq.h>
57 #include <pulsecore/shared.h>
58
59 #include "endianmacros.h"
60
61 #include "protocol-esound.h"
62
63 /* Don't accept more connection than this */
64 #define MAX_CONNECTIONS 64
65
66 /* Kick a client if it doesn't authenticate within this time */
67 #define AUTH_TIMEOUT (5*PA_USEC_PER_SEC)
68
69 #define DEFAULT_COOKIE_FILE ".esd_auth"
70
71 #define PLAYBACK_BUFFER_SECONDS (.25)
72 #define PLAYBACK_BUFFER_FRAGMENTS (10)
73 #define RECORD_BUFFER_SECONDS (5)
74
75 #define MAX_CACHE_SAMPLE_SIZE (2048000)
76
77 #define DEFAULT_SINK_LATENCY (150*PA_USEC_PER_MSEC)
78 #define DEFAULT_SOURCE_LATENCY (150*PA_USEC_PER_MSEC)
79
80 #define SCACHE_PREFIX "esound."
81
82 /* This is heavily based on esound's code */
83
84 typedef struct connection {
85     pa_msgobject parent;
86
87     uint32_t index;
88     pa_bool_t dead;
89     pa_esound_protocol *protocol;
90     pa_esound_options *options;
91     pa_iochannel *io;
92     pa_client *client;
93     pa_bool_t authorized, swap_byte_order;
94     void *write_data;
95     size_t write_data_alloc, write_data_index, write_data_length;
96     void *read_data;
97     size_t read_data_alloc, read_data_length;
98     esd_proto_t request;
99     esd_client_state_t state;
100     pa_sink_input *sink_input;
101     pa_source_output *source_output;
102     pa_memblockq *input_memblockq, *output_memblockq;
103     pa_defer_event *defer_event;
104
105     char *original_name;
106
107     struct {
108         pa_memblock *current_memblock;
109         size_t memblock_index;
110         pa_atomic_t missing;
111         pa_bool_t underrun;
112     } playback;
113
114     struct {
115         pa_memchunk memchunk;
116         char *name;
117         pa_sample_spec sample_spec;
118     } scache;
119
120     pa_time_event *auth_timeout_event;
121 } connection;
122
123 PA_DEFINE_PRIVATE_CLASS(connection, pa_msgobject);
124 #define CONNECTION(o) (connection_cast(o))
125
126 struct pa_esound_protocol {
127     PA_REFCNT_DECLARE;
128
129     pa_core *core;
130     pa_idxset *connections;
131     unsigned n_player;
132 };
133
134 enum {
135     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
136     SINK_INPUT_MESSAGE_DISABLE_PREBUF
137 };
138
139 enum {
140     CONNECTION_MESSAGE_REQUEST_DATA,
141     CONNECTION_MESSAGE_POST_DATA,
142     CONNECTION_MESSAGE_UNLINK_CONNECTION
143 };
144
145 typedef struct proto_handler {
146     size_t data_length;
147     int (*proc)(connection *c, esd_proto_t request, const void *data, size_t length);
148     const char *description;
149 } esd_proto_handler_info_t;
150
151 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
152 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
153 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
154 static void sink_input_kill_cb(pa_sink_input *i);
155 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
156 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
157
158 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
159 static void source_output_kill_cb(pa_source_output *o);
160
161 static int esd_proto_connect(connection *c, esd_proto_t request, const void *data, size_t length);
162 static int esd_proto_stream_play(connection *c, esd_proto_t request, const void *data, size_t length);
163 static int esd_proto_stream_record(connection *c, esd_proto_t request, const void *data, size_t length);
164 static int esd_proto_get_latency(connection *c, esd_proto_t request, const void *data, size_t length);
165 static int esd_proto_server_info(connection *c, esd_proto_t request, const void *data, size_t length);
166 static int esd_proto_all_info(connection *c, esd_proto_t request, const void *data, size_t length);
167 static int esd_proto_stream_pan(connection *c, esd_proto_t request, const void *data, size_t length);
168 static int esd_proto_sample_pan(connection *c, esd_proto_t request, const void *data, size_t length);
169 static int esd_proto_sample_cache(connection *c, esd_proto_t request, const void *data, size_t length);
170 static int esd_proto_sample_free_or_play(connection *c, esd_proto_t request, const void *data, size_t length);
171 static int esd_proto_sample_get_id(connection *c, esd_proto_t request, const void *data, size_t length);
172 static int esd_proto_standby_or_resume(connection *c, esd_proto_t request, const void *data, size_t length);
173 static int esd_proto_standby_mode(connection *c, esd_proto_t request, const void *data, size_t length);
174
175 /* the big map of protocol handler info */
176 static struct proto_handler proto_map[ESD_PROTO_MAX] = {
177     { ESD_KEY_LEN + sizeof(int),      esd_proto_connect, "connect" },
178     { ESD_KEY_LEN + sizeof(int),      NULL, "lock" },
179     { ESD_KEY_LEN + sizeof(int),      NULL, "unlock" },
180
181     { ESD_NAME_MAX + 2 * sizeof(int), esd_proto_stream_play, "stream play" },
182     { ESD_NAME_MAX + 2 * sizeof(int), esd_proto_stream_record, "stream rec" },
183     { ESD_NAME_MAX + 2 * sizeof(int), esd_proto_stream_record, "stream mon" },
184
185     { ESD_NAME_MAX + 3 * sizeof(int), esd_proto_sample_cache, "sample cache" },                      /* 6 */
186     { sizeof(int),                    esd_proto_sample_free_or_play, "sample free" },
187     { sizeof(int),                    esd_proto_sample_free_or_play, "sample play" },                /* 8 */
188     { sizeof(int),                    NULL, "sample loop" },
189     { sizeof(int),                    NULL, "sample stop" },
190     { (size_t) -1,                    NULL, "TODO: sample kill" },
191
192     { ESD_KEY_LEN + sizeof(int),      esd_proto_standby_or_resume, "standby" },
193     { ESD_KEY_LEN + sizeof(int),      esd_proto_standby_or_resume, "resume" },                       /* 13 */
194
195     { ESD_NAME_MAX,                   esd_proto_sample_get_id, "sample getid" },                     /* 14 */
196     { ESD_NAME_MAX + 2 * sizeof(int), NULL, "stream filter" },
197
198     { sizeof(int),                    esd_proto_server_info, "server info" },
199     { sizeof(int),                    esd_proto_all_info, "all info" },
200     { (size_t) -1,                    NULL, "TODO: subscribe" },
201     { (size_t) -1,                    NULL, "TODO: unsubscribe" },
202
203     { 3 * sizeof(int),                esd_proto_stream_pan, "stream pan"},
204     { 3 * sizeof(int),                esd_proto_sample_pan, "sample pan" },
205
206     { sizeof(int),                    esd_proto_standby_mode, "standby mode" },
207     { 0,                              esd_proto_get_latency, "get latency" }
208 };
209
210 static void connection_unlink(connection *c) {
211     pa_assert(c);
212
213     if (!c->protocol)
214         return;
215
216     if (c->options) {
217         pa_esound_options_unref(c->options);
218         c->options = NULL;
219     }
220
221     if (c->sink_input) {
222         pa_sink_input_unlink(c->sink_input);
223         pa_sink_input_unref(c->sink_input);
224         c->sink_input = NULL;
225     }
226
227     if (c->source_output) {
228         pa_source_output_unlink(c->source_output);
229         pa_source_output_unref(c->source_output);
230         c->source_output = NULL;
231     }
232
233     if (c->client) {
234         pa_client_free(c->client);
235         c->client = NULL;
236     }
237
238     if (c->state == ESD_STREAMING_DATA)
239         c->protocol->n_player--;
240
241     if (c->io) {
242         pa_iochannel_free(c->io);
243         c->io = NULL;
244     }
245
246     if (c->defer_event) {
247         c->protocol->core->mainloop->defer_free(c->defer_event);
248         c->defer_event = NULL;
249     }
250
251     if (c->auth_timeout_event) {
252         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
253         c->auth_timeout_event = NULL;
254     }
255
256     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
257     c->protocol = NULL;
258     connection_unref(c);
259 }
260
261 static void connection_free(pa_object *obj) {
262     connection *c = CONNECTION(obj);
263     pa_assert(c);
264
265     if (c->input_memblockq)
266         pa_memblockq_free(c->input_memblockq);
267     if (c->output_memblockq)
268         pa_memblockq_free(c->output_memblockq);
269
270     if (c->playback.current_memblock)
271         pa_memblock_unref(c->playback.current_memblock);
272
273     pa_xfree(c->read_data);
274     pa_xfree(c->write_data);
275
276     if (c->scache.memchunk.memblock)
277         pa_memblock_unref(c->scache.memchunk.memblock);
278     pa_xfree(c->scache.name);
279
280     pa_xfree(c->original_name);
281     pa_xfree(c);
282 }
283
284 static void connection_write_prepare(connection *c, size_t length) {
285     size_t t;
286     pa_assert(c);
287
288     t = c->write_data_length+length;
289
290     if (c->write_data_alloc < t)
291         c->write_data = pa_xrealloc(c->write_data, c->write_data_alloc = t);
292
293     pa_assert(c->write_data);
294 }
295
296 static void connection_write(connection *c, const void *data, size_t length) {
297     size_t i;
298     pa_assert(c);
299
300     c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
301
302     connection_write_prepare(c, length);
303
304     pa_assert(c->write_data);
305
306     i = c->write_data_length;
307     c->write_data_length += length;
308
309     memcpy((uint8_t*) c->write_data + i, data, length);
310 }
311
312 static void format_esd2native(int format, pa_bool_t swap_bytes, pa_sample_spec *ss) {
313     pa_assert(ss);
314
315     ss->channels = (uint8_t) (((format & ESD_MASK_CHAN) == ESD_STEREO) ? 2 : 1);
316     if ((format & ESD_MASK_BITS) == ESD_BITS16)
317         ss->format = swap_bytes ? PA_SAMPLE_S16RE : PA_SAMPLE_S16NE;
318     else
319         ss->format = PA_SAMPLE_U8;
320 }
321
322 static int format_native2esd(pa_sample_spec *ss) {
323     int format = 0;
324
325     format = (ss->format == PA_SAMPLE_U8) ? ESD_BITS8 : ESD_BITS16;
326     format |= (ss->channels >= 2) ? ESD_STEREO : ESD_MONO;
327
328     return format;
329 }
330
331 #define CHECK_VALIDITY(expression, ...) do { \
332     if (!(expression)) { \
333         pa_log_warn(__FILE__ ": " __VA_ARGS__); \
334         return -1; \
335     } \
336 } while(0);
337
338 /*** esound commands ***/
339
340 static int esd_proto_connect(connection *c, esd_proto_t request, const void *data, size_t length) {
341     uint32_t ekey;
342     int ok;
343
344     connection_assert_ref(c);
345     pa_assert(data);
346     pa_assert(length == (ESD_KEY_LEN + sizeof(uint32_t)));
347
348     if (!c->authorized && c->options->auth_cookie) {
349         const uint8_t*key;
350
351         if ((key = pa_auth_cookie_read(c->options->auth_cookie, ESD_KEY_LEN)))
352             if (memcmp(data, key, ESD_KEY_LEN) == 0)
353                 c->authorized = TRUE;
354     }
355
356     if (!c->authorized) {
357         pa_log("Kicked client with invalid authorization key.");
358         return -1;
359     }
360
361     if (c->auth_timeout_event) {
362         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
363         c->auth_timeout_event = NULL;
364     }
365
366     data = (const char*)data + ESD_KEY_LEN;
367
368     memcpy(&ekey, data, sizeof(uint32_t));
369     if (ekey == ESD_ENDIAN_KEY)
370         c->swap_byte_order = FALSE;
371     else if (ekey == ESD_SWAP_ENDIAN_KEY)
372         c->swap_byte_order = TRUE;
373     else {
374         pa_log_warn("Client sent invalid endian key");
375         return -1;
376     }
377
378     pa_proplist_sets(c->client->proplist, "esound.byte_order", c->swap_byte_order ? "reverse" : "native");
379
380     ok = 1;
381     connection_write(c, &ok, sizeof(int));
382     return 0;
383 }
384
385 static int esd_proto_stream_play(connection *c, esd_proto_t request, const void *data, size_t length) {
386     char name[ESD_NAME_MAX], *utf8_name;
387     int32_t format, rate;
388     pa_sample_spec ss;
389     size_t l;
390     pa_sink *sink = NULL;
391     pa_sink_input_new_data sdata;
392
393     connection_assert_ref(c);
394     pa_assert(data);
395     pa_assert(length == (sizeof(int32_t)*2+ESD_NAME_MAX));
396
397     memcpy(&format, data, sizeof(int32_t));
398     format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
399     data = (const char*) data + sizeof(int32_t);
400
401     memcpy(&rate, data, sizeof(int32_t));
402     rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
403     data = (const char*) data + sizeof(int32_t);
404
405     ss.rate = (uint32_t) rate;
406     format_esd2native(format, c->swap_byte_order, &ss);
407
408     CHECK_VALIDITY(pa_sample_spec_valid(&ss), "Invalid sample specification");
409
410     if (c->options->default_sink) {
411         sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK);
412         CHECK_VALIDITY(sink, "No such sink: %s", pa_strnull(c->options->default_sink));
413     }
414
415     pa_strlcpy(name, data, sizeof(name));
416
417     utf8_name = pa_utf8_filter(name);
418     pa_client_set_name(c->client, utf8_name);
419     pa_xfree(utf8_name);
420
421     c->original_name = pa_xstrdup(name);
422
423     pa_assert(!c->sink_input && !c->input_memblockq);
424
425     pa_sink_input_new_data_init(&sdata);
426     sdata.driver = __FILE__;
427     sdata.module = c->options->module;
428     sdata.client = c->client;
429     sdata.sink = sink;
430     pa_sink_input_new_data_set_sample_spec(&sdata, &ss);
431
432     pa_sink_input_new(&c->sink_input, c->protocol->core, &sdata);
433     pa_sink_input_new_data_done(&sdata);
434
435     CHECK_VALIDITY(c->sink_input, "Failed to create sink input.");
436
437     l = (size_t) ((double) pa_bytes_per_second(&ss)*PLAYBACK_BUFFER_SECONDS);
438     c->input_memblockq = pa_memblockq_new(
439             0,
440             l,
441             l,
442             pa_frame_size(&ss),
443             (size_t) -1,
444             l/PLAYBACK_BUFFER_FRAGMENTS,
445             0,
446             NULL);
447     pa_iochannel_socket_set_rcvbuf(c->io, l);
448
449     c->sink_input->parent.process_msg = sink_input_process_msg;
450     c->sink_input->pop = sink_input_pop_cb;
451     c->sink_input->process_rewind = sink_input_process_rewind_cb;
452     c->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
453     c->sink_input->kill = sink_input_kill_cb;
454     c->sink_input->userdata = c;
455
456     pa_sink_input_set_requested_latency(c->sink_input, DEFAULT_SINK_LATENCY);
457
458     c->state = ESD_STREAMING_DATA;
459
460     c->protocol->n_player++;
461
462     pa_atomic_store(&c->playback.missing, (int) pa_memblockq_missing(c->input_memblockq));
463
464     pa_sink_input_put(c->sink_input);
465
466     return 0;
467 }
468
469 static int esd_proto_stream_record(connection *c, esd_proto_t request, const void *data, size_t length) {
470     char name[ESD_NAME_MAX], *utf8_name;
471     int32_t format, rate;
472     pa_source *source = NULL;
473     pa_sample_spec ss;
474     size_t l;
475     pa_source_output_new_data sdata;
476
477     connection_assert_ref(c);
478     pa_assert(data);
479     pa_assert(length == (sizeof(int32_t)*2+ESD_NAME_MAX));
480
481     memcpy(&format, data, sizeof(int32_t));
482     format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
483     data = (const char*) data + sizeof(int32_t);
484
485     memcpy(&rate, data, sizeof(int32_t));
486     rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
487     data = (const char*) data + sizeof(int32_t);
488
489     ss.rate = (uint32_t) rate;
490     format_esd2native(format, c->swap_byte_order, &ss);
491
492     CHECK_VALIDITY(pa_sample_spec_valid(&ss), "Invalid sample specification.");
493
494     if (request == ESD_PROTO_STREAM_MON) {
495         pa_sink* sink;
496
497         sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK);
498         CHECK_VALIDITY(sink, "No such sink: %s", pa_strnull(c->options->default_sink));
499
500         source = sink->monitor_source;
501         CHECK_VALIDITY(source, "No such source.");
502     } else {
503         pa_assert(request == ESD_PROTO_STREAM_REC);
504
505         if (c->options->default_source) {
506             source = pa_namereg_get(c->protocol->core, c->options->default_source, PA_NAMEREG_SOURCE);
507             CHECK_VALIDITY(source, "No such source: %s", pa_strnull(c->options->default_source));
508         }
509     }
510
511     pa_strlcpy(name, data, sizeof(name));
512
513     utf8_name = pa_utf8_filter(name);
514     pa_client_set_name(c->client, utf8_name);
515     pa_xfree(utf8_name);
516
517     c->original_name = pa_xstrdup(name);
518
519     pa_assert(!c->output_memblockq && !c->source_output);
520
521     pa_source_output_new_data_init(&sdata);
522     sdata.driver = __FILE__;
523     sdata.module = c->options->module;
524     sdata.client = c->client;
525     sdata.source = source;
526     pa_source_output_new_data_set_sample_spec(&sdata, &ss);
527
528     pa_source_output_new(&c->source_output, c->protocol->core, &sdata);
529     pa_source_output_new_data_done(&sdata);
530
531     CHECK_VALIDITY(c->source_output, "Failed to create source output.");
532
533     l = (size_t) (pa_bytes_per_second(&ss)*RECORD_BUFFER_SECONDS);
534     c->output_memblockq = pa_memblockq_new(
535             0,
536             l,
537             l,
538             pa_frame_size(&ss),
539             1,
540             0,
541             0,
542             NULL);
543     pa_iochannel_socket_set_sndbuf(c->io, l);
544
545     c->source_output->push = source_output_push_cb;
546     c->source_output->kill = source_output_kill_cb;
547     c->source_output->get_latency = source_output_get_latency_cb;
548     c->source_output->userdata = c;
549
550     pa_source_output_set_requested_latency(c->source_output, DEFAULT_SOURCE_LATENCY);
551
552     c->state = ESD_STREAMING_DATA;
553
554     c->protocol->n_player++;
555
556     pa_source_output_put(c->source_output);
557
558     return 0;
559 }
560
561 static int esd_proto_get_latency(connection *c, esd_proto_t request, const void *data, size_t length) {
562     pa_sink *sink;
563     int32_t latency;
564
565     connection_assert_ref(c);
566     pa_assert(!data);
567     pa_assert(length == 0);
568
569     if (!(sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK)))
570         latency = 0;
571     else {
572         double usec = (double) pa_sink_get_requested_latency(sink);
573         latency = (int) ((usec*44100)/1000000);
574     }
575
576     latency = PA_MAYBE_INT32_SWAP(c->swap_byte_order, latency);
577     connection_write(c, &latency, sizeof(int32_t));
578
579     return 0;
580 }
581
582 static int esd_proto_server_info(connection *c, esd_proto_t request, const void *data, size_t length) {
583     int32_t rate = 44100, format = ESD_STEREO|ESD_BITS16;
584     int32_t response;
585     pa_sink *sink;
586
587     connection_assert_ref(c);
588     pa_assert(data);
589     pa_assert(length == sizeof(int32_t));
590
591     if ((sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK))) {
592         rate = (int32_t) sink->sample_spec.rate;
593         format = format_native2esd(&sink->sample_spec);
594     }
595
596     connection_write_prepare(c, sizeof(int32_t) * 3);
597
598     response = 0;
599     connection_write(c, &response, sizeof(int32_t));
600     rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
601     connection_write(c, &rate, sizeof(int32_t));
602     format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
603     connection_write(c, &format, sizeof(int32_t));
604
605     return 0;
606 }
607
608 static int esd_proto_all_info(connection *c, esd_proto_t request, const void *data, size_t length) {
609     size_t t, k, s;
610     connection *conn;
611     uint32_t idx = PA_IDXSET_INVALID;
612     unsigned nsamples;
613     char terminator[sizeof(int32_t)*6+ESD_NAME_MAX];
614
615     connection_assert_ref(c);
616     pa_assert(data);
617     pa_assert(length == sizeof(int32_t));
618
619     if (esd_proto_server_info(c, request, data, length) < 0)
620         return -1;
621
622     k = sizeof(int32_t)*5+ESD_NAME_MAX;
623     s = sizeof(int32_t)*6+ESD_NAME_MAX;
624     nsamples = pa_idxset_size(c->protocol->core->scache);
625     t = s*(nsamples+1) + k*(c->protocol->n_player+1);
626
627     connection_write_prepare(c, t);
628
629     memset(terminator, 0, sizeof(terminator));
630
631     for (conn = pa_idxset_first(c->protocol->connections, &idx); conn; conn = pa_idxset_next(c->protocol->connections, &idx)) {
632         int32_t id, format = ESD_BITS16 | ESD_STEREO, rate = 44100, lvolume = ESD_VOLUME_BASE, rvolume = ESD_VOLUME_BASE;
633         char name[ESD_NAME_MAX];
634
635         if (conn->state != ESD_STREAMING_DATA)
636             continue;
637
638         pa_assert(t >= k*2+s);
639
640         if (conn->sink_input) {
641             pa_cvolume volume;
642             pa_sink_input_get_volume(conn->sink_input, &volume, TRUE);
643             rate = (int32_t) conn->sink_input->sample_spec.rate;
644             lvolume = (int32_t) ((volume.values[0]*ESD_VOLUME_BASE)/PA_VOLUME_NORM);
645             rvolume = (int32_t) ((volume.values[volume.channels == 2 ? 1 : 0]*ESD_VOLUME_BASE)/PA_VOLUME_NORM);
646             format = format_native2esd(&conn->sink_input->sample_spec);
647         }
648
649         /* id */
650         id = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int32_t) (conn->index+1));
651         connection_write(c, &id, sizeof(int32_t));
652
653         /* name */
654         memset(name, 0, ESD_NAME_MAX); /* don't leak old data */
655         if (conn->original_name)
656             strncpy(name, conn->original_name, ESD_NAME_MAX);
657         else if (conn->client && pa_proplist_gets(conn->client->proplist, PA_PROP_APPLICATION_NAME))
658             strncpy(name, pa_proplist_gets(conn->client->proplist, PA_PROP_APPLICATION_NAME), ESD_NAME_MAX);
659         connection_write(c, name, ESD_NAME_MAX);
660
661         /* rate */
662         rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
663         connection_write(c, &rate, sizeof(int32_t));
664
665         /* left */
666         lvolume = PA_MAYBE_INT32_SWAP(c->swap_byte_order, lvolume);
667         connection_write(c, &lvolume, sizeof(int32_t));
668
669         /*right*/
670         rvolume = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rvolume);
671         connection_write(c, &rvolume, sizeof(int32_t));
672
673         /*format*/
674         format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
675         connection_write(c, &format, sizeof(int32_t));
676
677         t -= k;
678     }
679
680     pa_assert(t == s*(nsamples+1)+k);
681     t -= k;
682
683     connection_write(c, terminator, k);
684
685     if (nsamples) {
686         pa_scache_entry *ce;
687
688         idx = PA_IDXSET_INVALID;
689         for (ce = pa_idxset_first(c->protocol->core->scache, &idx); ce; ce = pa_idxset_next(c->protocol->core->scache, &idx)) {
690             int32_t id, rate, lvolume, rvolume, format, len;
691             char name[ESD_NAME_MAX];
692             pa_channel_map stereo = { .channels = 2, .map = { PA_CHANNEL_POSITION_LEFT, PA_CHANNEL_POSITION_RIGHT } };
693             pa_cvolume volume;
694             pa_sample_spec ss;
695
696             pa_assert(t >= s*2);
697
698             if (ce->volume_is_set) {
699                 volume = ce->volume;
700                 pa_cvolume_remap(&volume, &ce->channel_map, &stereo);
701             } else
702                 pa_cvolume_reset(&volume, 2);
703
704             if (ce->memchunk.memblock)
705                 ss = ce->sample_spec;
706             else {
707                 ss.format = PA_SAMPLE_S16NE;
708                 ss.rate = 44100;
709                 ss.channels = 2;
710             }
711
712             /* id */
713             id = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int) (ce->index+1));
714             connection_write(c, &id, sizeof(int32_t));
715
716             /* name */
717             memset(name, 0, ESD_NAME_MAX); /* don't leak old data */
718             if (strncmp(ce->name, SCACHE_PREFIX, sizeof(SCACHE_PREFIX)-1) == 0)
719                 strncpy(name, ce->name+sizeof(SCACHE_PREFIX)-1, ESD_NAME_MAX);
720             else
721                 pa_snprintf(name, ESD_NAME_MAX, "native.%s", ce->name);
722             connection_write(c, name, ESD_NAME_MAX);
723
724             /* rate */
725             rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int32_t) ss.rate);
726             connection_write(c, &rate, sizeof(int32_t));
727
728             /* left */
729             lvolume = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int32_t) ((volume.values[0]*ESD_VOLUME_BASE)/PA_VOLUME_NORM));
730             connection_write(c, &lvolume, sizeof(int32_t));
731
732             /*right*/
733             rvolume = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int32_t) ((volume.values[1]*ESD_VOLUME_BASE)/PA_VOLUME_NORM));
734             connection_write(c, &rvolume, sizeof(int32_t));
735
736             /*format*/
737             format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format_native2esd(&ss));
738             connection_write(c, &format, sizeof(int32_t));
739
740             /*length*/
741             len = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int) ce->memchunk.length);
742             connection_write(c, &len, sizeof(int32_t));
743
744             t -= s;
745         }
746     }
747
748     pa_assert(t == s);
749
750     connection_write(c, terminator, s);
751
752     return 0;
753 }
754
755 static int esd_proto_stream_pan(connection *c, esd_proto_t request, const void *data, size_t length) {
756     int32_t ok;
757     uint32_t idx, lvolume, rvolume;
758     connection *conn;
759
760     connection_assert_ref(c);
761     pa_assert(data);
762     pa_assert(length == sizeof(int32_t)*3);
763
764     memcpy(&idx, data, sizeof(uint32_t));
765     idx = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1;
766     data = (const char*)data + sizeof(uint32_t);
767
768     memcpy(&lvolume, data, sizeof(uint32_t));
769     lvolume = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, lvolume);
770     data = (const char*)data + sizeof(uint32_t);
771
772     memcpy(&rvolume, data, sizeof(uint32_t));
773     rvolume = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, rvolume);
774
775     if ((conn = pa_idxset_get_by_index(c->protocol->connections, idx)) && conn->sink_input) {
776         pa_cvolume volume;
777         volume.values[0] = (lvolume*PA_VOLUME_NORM)/ESD_VOLUME_BASE;
778         volume.values[1] = (rvolume*PA_VOLUME_NORM)/ESD_VOLUME_BASE;
779         volume.channels = conn->sink_input->sample_spec.channels;
780
781         pa_sink_input_set_volume(conn->sink_input, &volume, TRUE, TRUE);
782         ok = 1;
783     } else
784         ok = 0;
785
786     connection_write(c, &ok, sizeof(int32_t));
787
788     return 0;
789 }
790
791 static int esd_proto_sample_pan(connection *c, esd_proto_t request, const void *data, size_t length) {
792     int32_t ok = 0;
793     uint32_t idx, lvolume, rvolume;
794     pa_cvolume volume;
795     pa_scache_entry *ce;
796
797     connection_assert_ref(c);
798     pa_assert(data);
799     pa_assert(length == sizeof(int32_t)*3);
800
801     memcpy(&idx, data, sizeof(uint32_t));
802     idx = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1;
803     data = (const char*)data + sizeof(uint32_t);
804
805     memcpy(&lvolume, data, sizeof(uint32_t));
806     lvolume = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, lvolume);
807     data = (const char*)data + sizeof(uint32_t);
808
809     memcpy(&rvolume, data, sizeof(uint32_t));
810     rvolume = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, rvolume);
811
812     volume.values[0] = (lvolume*PA_VOLUME_NORM)/ESD_VOLUME_BASE;
813     volume.values[1] = (rvolume*PA_VOLUME_NORM)/ESD_VOLUME_BASE;
814     volume.channels = 2;
815
816     if ((ce = pa_idxset_get_by_index(c->protocol->core->scache, idx))) {
817         pa_channel_map stereo = { .channels = 2, .map = { PA_CHANNEL_POSITION_LEFT, PA_CHANNEL_POSITION_RIGHT } };
818
819         pa_cvolume_remap(&volume, &stereo, &ce->channel_map);
820         ce->volume = volume;
821         ce->volume_is_set = TRUE;
822         ok = 1;
823     }
824
825     connection_write(c, &ok, sizeof(int32_t));
826
827     return 0;
828 }
829
830 static int esd_proto_sample_cache(connection *c, esd_proto_t request, const void *data, size_t length) {
831     pa_sample_spec ss;
832     int32_t format, rate, sc_length;
833     uint32_t idx;
834     char name[ESD_NAME_MAX+sizeof(SCACHE_PREFIX)-1];
835
836     connection_assert_ref(c);
837     pa_assert(data);
838     pa_assert(length == (ESD_NAME_MAX+3*sizeof(int32_t)));
839
840     memcpy(&format, data, sizeof(int32_t));
841     format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
842     data = (const char*)data + sizeof(int32_t);
843
844     memcpy(&rate, data, sizeof(int32_t));
845     rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
846     data = (const char*)data + sizeof(int32_t);
847
848     ss.rate = (uint32_t) rate;
849     format_esd2native(format, c->swap_byte_order, &ss);
850
851     CHECK_VALIDITY(pa_sample_spec_valid(&ss), "Invalid sample specification.");
852
853     memcpy(&sc_length, data, sizeof(int32_t));
854     sc_length = PA_MAYBE_INT32_SWAP(c->swap_byte_order, sc_length);
855     data = (const char*)data + sizeof(int32_t);
856
857     CHECK_VALIDITY(sc_length <= MAX_CACHE_SAMPLE_SIZE, "Sample too large (%d bytes).", (int)sc_length);
858
859     strcpy(name, SCACHE_PREFIX);
860     pa_strlcpy(name+sizeof(SCACHE_PREFIX)-1, data, ESD_NAME_MAX);
861
862     CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name.");
863
864     pa_assert(!c->scache.memchunk.memblock);
865     c->scache.memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, (size_t) sc_length);
866     c->scache.memchunk.index = 0;
867     c->scache.memchunk.length = (size_t) sc_length;
868     c->scache.sample_spec = ss;
869     pa_assert(!c->scache.name);
870     c->scache.name = pa_xstrdup(name);
871
872     c->state = ESD_CACHING_SAMPLE;
873
874     pa_scache_add_item(c->protocol->core, c->scache.name, NULL, NULL, NULL, c->client->proplist, &idx);
875
876     idx += 1;
877     connection_write(c, &idx, sizeof(uint32_t));
878
879     return 0;
880 }
881
882 static int esd_proto_sample_get_id(connection *c, esd_proto_t request, const void *data, size_t length) {
883     int32_t ok;
884     uint32_t idx;
885     char name[ESD_NAME_MAX+sizeof(SCACHE_PREFIX)-1];
886
887     connection_assert_ref(c);
888     pa_assert(data);
889     pa_assert(length == ESD_NAME_MAX);
890
891     strcpy(name, SCACHE_PREFIX);
892     pa_strlcpy(name+sizeof(SCACHE_PREFIX)-1, data, ESD_NAME_MAX);
893
894     CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name.");
895
896     ok = -1;
897     if ((idx = pa_scache_get_id_by_name(c->protocol->core, name)) != PA_IDXSET_INVALID)
898         ok = (int32_t) idx + 1;
899
900     connection_write(c, &ok, sizeof(int32_t));
901
902     return 0;
903 }
904
905 static int esd_proto_sample_free_or_play(connection *c, esd_proto_t request, const void *data, size_t length) {
906     int32_t ok;
907     const char *name;
908     uint32_t idx;
909
910     connection_assert_ref(c);
911     pa_assert(data);
912     pa_assert(length == sizeof(int32_t));
913
914     memcpy(&idx, data, sizeof(uint32_t));
915     idx = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1;
916
917     ok = 0;
918
919     if ((name = pa_scache_get_name_by_id(c->protocol->core, idx))) {
920         if (request == ESD_PROTO_SAMPLE_PLAY) {
921             pa_sink *sink;
922
923             if ((sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK)))
924                 if (pa_scache_play_item(c->protocol->core, name, sink, PA_VOLUME_NORM, c->client->proplist, NULL) >= 0)
925                     ok = (int32_t) idx + 1;
926         } else {
927             pa_assert(request == ESD_PROTO_SAMPLE_FREE);
928
929             if (pa_scache_remove_item(c->protocol->core, name) >= 0)
930                 ok = (int32_t) idx + 1;
931         }
932     }
933
934     connection_write(c, &ok, sizeof(int32_t));
935
936     return 0;
937 }
938
939 static int esd_proto_standby_or_resume(connection *c, esd_proto_t request, const void *data, size_t length) {
940     int32_t ok = 1;
941
942     connection_assert_ref(c);
943
944     connection_write_prepare(c, sizeof(int32_t) * 2);
945     connection_write(c, &ok, sizeof(int32_t));
946
947     if (request == ESD_PROTO_STANDBY)
948         ok = pa_sink_suspend_all(c->protocol->core, TRUE, PA_SUSPEND_USER) >= 0;
949     else {
950         pa_assert(request == ESD_PROTO_RESUME);
951         ok = pa_sink_suspend_all(c->protocol->core, FALSE, PA_SUSPEND_USER) >= 0;
952     }
953
954     connection_write(c, &ok, sizeof(int32_t));
955
956     return 0;
957 }
958
959 static int esd_proto_standby_mode(connection *c, esd_proto_t request, const void *data, size_t length) {
960     int32_t mode;
961     pa_sink *sink, *source;
962
963     connection_assert_ref(c);
964
965     mode = ESM_RUNNING;
966
967     if ((sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK)))
968         if (pa_sink_get_state(sink) == PA_SINK_SUSPENDED)
969             mode = ESM_ON_STANDBY;
970
971     if ((source = pa_namereg_get(c->protocol->core, c->options->default_source, PA_NAMEREG_SOURCE)))
972         if (pa_source_get_state(source) == PA_SOURCE_SUSPENDED)
973             mode = ESM_ON_STANDBY;
974
975     mode = PA_MAYBE_INT32_SWAP(c->swap_byte_order, mode);
976
977     connection_write(c, &mode, sizeof(mode));
978     return 0;
979 }
980
981 /*** client callbacks ***/
982
983 static void client_kill_cb(pa_client *c) {
984     pa_assert(c);
985
986     connection_unlink(CONNECTION(c->userdata));
987 }
988
989 /*** pa_iochannel callbacks ***/
990
991 static int do_read(connection *c) {
992     connection_assert_ref(c);
993
994 /*     pa_log("READ"); */
995
996     if (c->state == ESD_NEXT_REQUEST) {
997         ssize_t r;
998         pa_assert(c->read_data_length < sizeof(c->request));
999
1000         if ((r = pa_iochannel_read(c->io,
1001                                    ((uint8_t*) &c->request) + c->read_data_length,
1002                                    sizeof(c->request) - c->read_data_length)) <= 0) {
1003
1004             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1005                 return 0;
1006
1007             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
1008             return -1;
1009         }
1010
1011         c->read_data_length += (size_t) r;
1012
1013         if (c->read_data_length >= sizeof(c->request)) {
1014             struct proto_handler *handler;
1015
1016             c->request = PA_MAYBE_INT32_SWAP(c->swap_byte_order, c->request);
1017
1018             if (c->request < ESD_PROTO_CONNECT || c->request >= ESD_PROTO_MAX) {
1019                 pa_log("recieved invalid request.");
1020                 return -1;
1021             }
1022
1023             handler = proto_map+c->request;
1024
1025 /*             pa_log("executing request #%u", c->request); */
1026
1027             if (!handler->proc) {
1028                 pa_log("recieved unimplemented request #%u.", c->request);
1029                 return -1;
1030             }
1031
1032             if (handler->data_length == 0) {
1033                 c->read_data_length = 0;
1034
1035                 if (handler->proc(c, c->request, NULL, 0) < 0)
1036                     return -1;
1037
1038             } else {
1039                 if (c->read_data_alloc < handler->data_length)
1040                     c->read_data = pa_xrealloc(c->read_data, c->read_data_alloc = handler->data_length);
1041                 pa_assert(c->read_data);
1042
1043                 c->state = ESD_NEEDS_REQDATA;
1044                 c->read_data_length = 0;
1045             }
1046         }
1047
1048     } else if (c->state == ESD_NEEDS_REQDATA) {
1049         ssize_t r;
1050         struct proto_handler *handler = proto_map+c->request;
1051
1052         pa_assert(handler->proc);
1053
1054         pa_assert(c->read_data && c->read_data_length < handler->data_length);
1055
1056         if ((r = pa_iochannel_read(c->io,
1057                                    (uint8_t*) c->read_data + c->read_data_length,
1058                                    handler->data_length - c->read_data_length)) <= 0) {
1059
1060             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1061                 return 0;
1062
1063             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
1064             return -1;
1065         }
1066
1067         c->read_data_length += (size_t) r;
1068         if (c->read_data_length >= handler->data_length) {
1069             size_t l = c->read_data_length;
1070             pa_assert(handler->proc);
1071
1072             c->state = ESD_NEXT_REQUEST;
1073             c->read_data_length = 0;
1074
1075             if (handler->proc(c, c->request, c->read_data, l) < 0)
1076                 return -1;
1077         }
1078     } else if (c->state == ESD_CACHING_SAMPLE) {
1079         ssize_t r;
1080         void *p;
1081
1082         pa_assert(c->scache.memchunk.memblock);
1083         pa_assert(c->scache.name);
1084         pa_assert(c->scache.memchunk.index < c->scache.memchunk.length);
1085
1086         p = pa_memblock_acquire(c->scache.memchunk.memblock);
1087         r = pa_iochannel_read(c->io, (uint8_t*) p+c->scache.memchunk.index, c->scache.memchunk.length-c->scache.memchunk.index);
1088         pa_memblock_release(c->scache.memchunk.memblock);
1089
1090         if (r <= 0) {
1091             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1092                 return 0;
1093
1094             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
1095             return -1;
1096         }
1097
1098         c->scache.memchunk.index += (size_t) r;
1099         pa_assert(c->scache.memchunk.index <= c->scache.memchunk.length);
1100
1101         if (c->scache.memchunk.index == c->scache.memchunk.length) {
1102             uint32_t idx;
1103
1104             c->scache.memchunk.index = 0;
1105             pa_scache_add_item(c->protocol->core, c->scache.name, &c->scache.sample_spec, NULL, &c->scache.memchunk, c->client->proplist, &idx);
1106
1107             pa_memblock_unref(c->scache.memchunk.memblock);
1108             pa_memchunk_reset(&c->scache.memchunk);
1109
1110             pa_xfree(c->scache.name);
1111             c->scache.name = NULL;
1112
1113             c->state = ESD_NEXT_REQUEST;
1114
1115             idx += 1;
1116             connection_write(c, &idx, sizeof(uint32_t));
1117         }
1118
1119     } else if (c->state == ESD_STREAMING_DATA && c->sink_input) {
1120         pa_memchunk chunk;
1121         ssize_t r;
1122         size_t l;
1123         void *p;
1124         size_t space = 0;
1125
1126         pa_assert(c->input_memblockq);
1127
1128 /*         pa_log("STREAMING_DATA"); */
1129
1130         if (!(l = (size_t) pa_atomic_load(&c->playback.missing)))
1131             return 0;
1132
1133         if (c->playback.current_memblock) {
1134
1135             space = pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index;
1136
1137             if (space <= 0) {
1138                 pa_memblock_unref(c->playback.current_memblock);
1139                 c->playback.current_memblock = NULL;
1140             }
1141         }
1142
1143         if (!c->playback.current_memblock) {
1144             pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, (size_t) -1));
1145             c->playback.memblock_index = 0;
1146
1147             space = pa_memblock_get_length(c->playback.current_memblock);
1148         }
1149
1150         if (l > space)
1151             l = space;
1152
1153         p = pa_memblock_acquire(c->playback.current_memblock);
1154         r = pa_iochannel_read(c->io, (uint8_t*) p+c->playback.memblock_index, l);
1155         pa_memblock_release(c->playback.current_memblock);
1156
1157         if (r <= 0) {
1158
1159             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1160                 return 0;
1161
1162             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
1163             return -1;
1164         }
1165
1166         chunk.memblock = c->playback.current_memblock;
1167         chunk.index = c->playback.memblock_index;
1168         chunk.length = (size_t) r;
1169
1170         c->playback.memblock_index += (size_t) r;
1171
1172         pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL);
1173         pa_atomic_sub(&c->playback.missing, (int) r);
1174     }
1175
1176     return 0;
1177 }
1178
1179 static int do_write(connection *c) {
1180     connection_assert_ref(c);
1181
1182 /*     pa_log("WRITE"); */
1183
1184     if (c->write_data_length) {
1185         ssize_t r;
1186
1187         pa_assert(c->write_data_index < c->write_data_length);
1188         if ((r = pa_iochannel_write(c->io, (uint8_t*) c->write_data+c->write_data_index, c->write_data_length-c->write_data_index)) < 0) {
1189
1190             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1191                 return 0;
1192
1193             pa_log("write(): %s", pa_cstrerror(errno));
1194             return -1;
1195         }
1196
1197         c->write_data_index += (size_t) r;
1198         if (c->write_data_index >= c->write_data_length)
1199             c->write_data_length = c->write_data_index = 0;
1200
1201     } else if (c->state == ESD_STREAMING_DATA && c->source_output) {
1202         pa_memchunk chunk;
1203         ssize_t r;
1204         void *p;
1205
1206         if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)
1207             return 0;
1208
1209         pa_assert(chunk.memblock);
1210         pa_assert(chunk.length);
1211
1212         p = pa_memblock_acquire(chunk.memblock);
1213         r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length);
1214         pa_memblock_release(chunk.memblock);
1215
1216         pa_memblock_unref(chunk.memblock);
1217
1218         if (r < 0) {
1219
1220             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1221                 return 0;
1222
1223             pa_log("write(): %s", pa_cstrerror(errno));
1224             return -1;
1225         }
1226
1227         pa_memblockq_drop(c->output_memblockq, (size_t) r);
1228     }
1229
1230     return 0;
1231 }
1232
1233 static void do_work(connection *c) {
1234     connection_assert_ref(c);
1235
1236     c->protocol->core->mainloop->defer_enable(c->defer_event, 0);
1237
1238     if (c->dead)
1239         return;
1240
1241     if (pa_iochannel_is_readable(c->io))
1242         if (do_read(c) < 0)
1243             goto fail;
1244
1245     if (c->state == ESD_STREAMING_DATA && !c->sink_input && pa_iochannel_is_hungup(c->io))
1246         /* In case we are in capture mode we will never call read()
1247          * on the socket, hence we need to detect the hangup manually
1248          * here, instead of simply waiting for read() to return 0. */
1249         goto fail;
1250
1251     if (pa_iochannel_is_writable(c->io))
1252         if (do_write(c) < 0)
1253             goto fail;
1254
1255     return;
1256
1257 fail:
1258
1259     if (c->state == ESD_STREAMING_DATA && c->sink_input) {
1260         c->dead = TRUE;
1261
1262         pa_iochannel_free(c->io);
1263         c->io = NULL;
1264
1265         pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL);
1266     } else
1267         connection_unlink(c);
1268 }
1269
1270 static void io_callback(pa_iochannel*io, void *userdata) {
1271     connection *c = CONNECTION(userdata);
1272
1273     connection_assert_ref(c);
1274     pa_assert(io);
1275
1276     do_work(c);
1277 }
1278
1279 static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *userdata) {
1280     connection *c = CONNECTION(userdata);
1281
1282     connection_assert_ref(c);
1283     pa_assert(e);
1284
1285     do_work(c);
1286 }
1287
1288 static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1289     connection *c = CONNECTION(o);
1290     connection_assert_ref(c);
1291
1292     if (!c->protocol)
1293         return -1;
1294
1295     switch (code) {
1296         case CONNECTION_MESSAGE_REQUEST_DATA:
1297             do_work(c);
1298             break;
1299
1300         case CONNECTION_MESSAGE_POST_DATA:
1301 /*             pa_log("got data %u", chunk->length); */
1302             pa_memblockq_push_align(c->output_memblockq, chunk);
1303             do_work(c);
1304             break;
1305
1306         case CONNECTION_MESSAGE_UNLINK_CONNECTION:
1307             connection_unlink(c);
1308             break;
1309     }
1310
1311     return 0;
1312 }
1313
1314 /*** sink_input callbacks ***/
1315
1316 /* Called from thread context */
1317 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1318     pa_sink_input *i = PA_SINK_INPUT(o);
1319     connection*c;
1320
1321     pa_sink_input_assert_ref(i);
1322     c = CONNECTION(i->userdata);
1323     connection_assert_ref(c);
1324
1325     switch (code) {
1326
1327         case SINK_INPUT_MESSAGE_POST_DATA: {
1328             pa_assert(chunk);
1329
1330             /* New data from the main loop */
1331             pa_memblockq_push_align(c->input_memblockq, chunk);
1332
1333             if (pa_memblockq_is_readable(c->input_memblockq) && c->playback.underrun) {
1334                 pa_log_debug("Requesting rewind due to end of underrun.");
1335                 pa_sink_input_request_rewind(c->sink_input, 0, FALSE, TRUE, FALSE);
1336             }
1337
1338 /*             pa_log("got data, %u", pa_memblockq_get_length(c->input_memblockq)); */
1339
1340             return 0;
1341         }
1342
1343         case SINK_INPUT_MESSAGE_DISABLE_PREBUF:
1344             pa_memblockq_prebuf_disable(c->input_memblockq);
1345             return 0;
1346
1347         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1348             pa_usec_t *r = userdata;
1349
1350             *r = pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
1351
1352             /* Fall through, the default handler will add in the extra
1353              * latency added by the resampler */
1354         }
1355
1356         default:
1357             return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1358     }
1359 }
1360
1361 /* Called from thread context */
1362 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
1363     connection*c;
1364
1365     pa_sink_input_assert_ref(i);
1366     c = CONNECTION(i->userdata);
1367     connection_assert_ref(c);
1368     pa_assert(chunk);
1369
1370     if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) {
1371
1372         c->playback.underrun = TRUE;
1373
1374         if (c->dead && pa_sink_input_safe_to_remove(i))
1375             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL);
1376
1377         return -1;
1378     } else {
1379         size_t m;
1380
1381         chunk->length = PA_MIN(length, chunk->length);
1382
1383         c->playback.underrun = FALSE;
1384
1385         pa_memblockq_drop(c->input_memblockq, chunk->length);
1386         m = pa_memblockq_pop_missing(c->input_memblockq);
1387
1388         if (m > 0)
1389             if (pa_atomic_add(&c->playback.missing, (int) m) <= 0)
1390                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1391
1392         return 0;
1393     }
1394 }
1395
1396 /* Called from thread context */
1397 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1398     connection *c;
1399
1400     pa_sink_input_assert_ref(i);
1401     c = CONNECTION(i->userdata);
1402     connection_assert_ref(c);
1403
1404     /* If we are in an underrun, then we don't rewind */
1405     if (i->thread_info.underrun_for > 0)
1406         return;
1407
1408     pa_memblockq_rewind(c->input_memblockq, nbytes);
1409 }
1410
1411 /* Called from thread context */
1412 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1413     connection *c;
1414
1415     pa_sink_input_assert_ref(i);
1416     c = CONNECTION(i->userdata);
1417     connection_assert_ref(c);
1418
1419     pa_memblockq_set_maxrewind(c->input_memblockq, nbytes);
1420 }
1421
1422 static void sink_input_kill_cb(pa_sink_input *i) {
1423     pa_sink_input_assert_ref(i);
1424
1425     connection_unlink(CONNECTION(i->userdata));
1426 }
1427
1428 /*** source_output callbacks ***/
1429
1430 /* Called from thread context */
1431 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1432     connection *c;
1433
1434     pa_source_output_assert_ref(o);
1435     c = CONNECTION(o->userdata);
1436     pa_assert(c);
1437     pa_assert(chunk);
1438
1439     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1440 }
1441
1442 static void source_output_kill_cb(pa_source_output *o) {
1443     pa_source_output_assert_ref(o);
1444
1445     connection_unlink(CONNECTION(o->userdata));
1446 }
1447
1448 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1449     connection*c;
1450
1451     pa_source_output_assert_ref(o);
1452     c = CONNECTION(o->userdata);
1453     pa_assert(c);
1454
1455     return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
1456 }
1457
1458 /*** entry points ***/
1459
1460 static void auth_timeout(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
1461     connection *c = CONNECTION(userdata);
1462
1463     pa_assert(m);
1464     connection_assert_ref(c);
1465     pa_assert(c->auth_timeout_event == e);
1466
1467     if (!c->authorized)
1468         connection_unlink(c);
1469 }
1470
1471 void pa_esound_protocol_connect(pa_esound_protocol *p, pa_iochannel *io, pa_esound_options *o) {
1472     connection *c;
1473     char pname[128];
1474     pa_client_new_data data;
1475     pa_client *client;
1476
1477     pa_assert(p);
1478     pa_assert(io);
1479     pa_assert(o);
1480
1481     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
1482         pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
1483         pa_iochannel_free(io);
1484         return;
1485     }
1486
1487     pa_client_new_data_init(&data);
1488     data.module = o->module;
1489     data.driver = __FILE__;
1490     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
1491     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "EsounD client (%s)", pname);
1492     pa_proplist_sets(data.proplist, "esound-protocol.peer", pname);
1493     client = pa_client_new(p->core, &data);
1494     pa_client_new_data_done(&data);
1495
1496     if (!client)
1497         return;
1498
1499     c = pa_msgobject_new(connection);
1500     c->parent.parent.free = connection_free;
1501     c->parent.process_msg = connection_process_msg;
1502     c->protocol = p;
1503     c->io = io;
1504     pa_iochannel_set_callback(c->io, io_callback, c);
1505
1506     c->client = client;
1507     c->client->kill = client_kill_cb;
1508     c->client->userdata = c;
1509
1510     c->options = pa_esound_options_ref(o);
1511     c->authorized = FALSE;
1512     c->swap_byte_order = FALSE;
1513     c->dead = FALSE;
1514
1515     c->read_data_length = 0;
1516     c->read_data = pa_xmalloc(c->read_data_alloc = proto_map[ESD_PROTO_CONNECT].data_length);
1517
1518     c->write_data_length = c->write_data_index = c->write_data_alloc = 0;
1519     c->write_data = NULL;
1520
1521     c->state = ESD_NEEDS_REQDATA;
1522     c->request = ESD_PROTO_CONNECT;
1523
1524     c->sink_input = NULL;
1525     c->input_memblockq = NULL;
1526
1527     c->source_output = NULL;
1528     c->output_memblockq = NULL;
1529
1530     c->playback.current_memblock = NULL;
1531     c->playback.memblock_index = 0;
1532     c->playback.underrun = TRUE;
1533     pa_atomic_store(&c->playback.missing, 0);
1534
1535     pa_memchunk_reset(&c->scache.memchunk);
1536     c->scache.name = NULL;
1537
1538     c->original_name = NULL;
1539
1540     if (o->auth_anonymous) {
1541         pa_log_info("Client authenticated anonymously.");
1542         c->authorized = TRUE;
1543     }
1544
1545     if (!c->authorized &&
1546         o->auth_ip_acl &&
1547         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
1548
1549         pa_log_info("Client authenticated by IP ACL.");
1550         c->authorized = TRUE;
1551     }
1552
1553     if (!c->authorized)
1554         c->auth_timeout_event = pa_core_rttime_new(p->core, pa_rtclock_now() + AUTH_TIMEOUT, auth_timeout, c);
1555     else
1556         c->auth_timeout_event = NULL;
1557
1558     c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c);
1559     p->core->mainloop->defer_enable(c->defer_event, 0);
1560
1561     pa_idxset_put(p->connections, c, &c->index);
1562 }
1563
1564 void pa_esound_protocol_disconnect(pa_esound_protocol *p, pa_module *m) {
1565     connection *c;
1566     void *state = NULL;
1567
1568     pa_assert(p);
1569     pa_assert(m);
1570
1571     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
1572         if (c->options->module == m)
1573             connection_unlink(c);
1574 }
1575
1576 static pa_esound_protocol* esound_protocol_new(pa_core *c) {
1577     pa_esound_protocol *p;
1578
1579     pa_assert(c);
1580
1581     p = pa_xnew(pa_esound_protocol, 1);
1582     PA_REFCNT_INIT(p);
1583     p->core = c;
1584     p->connections = pa_idxset_new(NULL, NULL);
1585     p->n_player = 0;
1586
1587     pa_assert_se(pa_shared_set(c, "esound-protocol", p) >= 0);
1588
1589     return p;
1590 }
1591
1592 pa_esound_protocol* pa_esound_protocol_get(pa_core *c) {
1593     pa_esound_protocol *p;
1594
1595     if ((p = pa_shared_get(c, "esound-protocol")))
1596         return pa_esound_protocol_ref(p);
1597
1598     return esound_protocol_new(c);
1599 }
1600
1601 pa_esound_protocol* pa_esound_protocol_ref(pa_esound_protocol *p) {
1602     pa_assert(p);
1603     pa_assert(PA_REFCNT_VALUE(p) >= 1);
1604
1605     PA_REFCNT_INC(p);
1606
1607     return p;
1608 }
1609
1610 void pa_esound_protocol_unref(pa_esound_protocol *p) {
1611     connection *c;
1612     pa_assert(p);
1613     pa_assert(PA_REFCNT_VALUE(p) >= 1);
1614
1615     if (PA_REFCNT_DEC(p) > 0)
1616         return;
1617
1618     while ((c = pa_idxset_first(p->connections, NULL)))
1619         connection_unlink(c);
1620
1621     pa_idxset_free(p->connections, NULL, NULL);
1622
1623     pa_assert_se(pa_shared_remove(p->core, "esound-protocol") >= 0);
1624
1625     pa_xfree(p);
1626 }
1627
1628 pa_esound_options* pa_esound_options_new(void) {
1629     pa_esound_options *o;
1630
1631     o = pa_xnew0(pa_esound_options, 1);
1632     PA_REFCNT_INIT(o);
1633
1634     return o;
1635 }
1636
1637 pa_esound_options* pa_esound_options_ref(pa_esound_options *o) {
1638     pa_assert(o);
1639     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1640
1641     PA_REFCNT_INC(o);
1642
1643     return o;
1644 }
1645
1646 void pa_esound_options_unref(pa_esound_options *o) {
1647     pa_assert(o);
1648     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1649
1650     if (PA_REFCNT_DEC(o) > 0)
1651         return;
1652
1653     if (o->auth_ip_acl)
1654         pa_ip_acl_free(o->auth_ip_acl);
1655
1656     if (o->auth_cookie)
1657         pa_auth_cookie_unref(o->auth_cookie);
1658
1659     pa_xfree(o->default_sink);
1660     pa_xfree(o->default_source);
1661
1662     pa_xfree(o);
1663 }
1664
1665 int pa_esound_options_parse(pa_esound_options *o, pa_core *c, pa_modargs *ma) {
1666     pa_bool_t enabled;
1667     const char *acl;
1668
1669     pa_assert(o);
1670     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1671     pa_assert(ma);
1672
1673     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
1674         pa_log("auth-anonymous= expects a boolean argument.");
1675         return -1;
1676     }
1677
1678     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
1679         pa_ip_acl *ipa;
1680
1681         if (!(ipa = pa_ip_acl_new(acl))) {
1682             pa_log("Failed to parse IP ACL '%s'", acl);
1683             return -1;
1684         }
1685
1686         if (o->auth_ip_acl)
1687             pa_ip_acl_free(o->auth_ip_acl);
1688
1689         o->auth_ip_acl = ipa;
1690     }
1691
1692     enabled = TRUE;
1693     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
1694         pa_log("auth-cookie-enabled= expects a boolean argument.");
1695         return -1;
1696     }
1697
1698     if (o->auth_cookie)
1699         pa_auth_cookie_unref(o->auth_cookie);
1700
1701     if (enabled) {
1702         const char *cn;
1703
1704         /* The new name for this is 'auth-cookie', for compat reasons
1705          * we check the old name too */
1706         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
1707             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
1708                 cn = DEFAULT_COOKIE_FILE;
1709
1710         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, ESD_KEY_LEN)))
1711             return -1;
1712
1713     } else
1714         o->auth_cookie = NULL;
1715
1716     pa_xfree(o->default_sink);
1717     o->default_sink = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
1718
1719     pa_xfree(o->default_source);
1720     o->default_source = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
1721
1722     return 0;
1723 }