Merge commit 'origin/master-tx'
[platform/upstream/pulseaudio.git] / src / pulsecore / protocol-esound.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2004-2006 Lennart Poettering
5   Copyright 2006 Pierre Ossman <ossman@cendio.se> for Cendio AB
6
7   PulseAudio is free software; you can redistribute it and/or modify
8   it under the terms of the GNU Lesser General Public License as published
9   by the Free Software Foundation; either version 2.1 of the License,
10   or (at your option) any later version.
11
12   PulseAudio is distributed in the hope that it will be useful, but
13   WITHOUT ANY WARRANTY; without even the implied warranty of
14   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15   General Public License for more details.
16
17   You should have received a copy of the GNU Lesser General Public License
18   along with PulseAudio; if not, write to the Free Software
19   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20   USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <errno.h>
28 #include <string.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <limits.h>
32
33 #include <pulse/rtclock.h>
34 #include <pulse/sample.h>
35 #include <pulse/timeval.h>
36 #include <pulse/utf8.h>
37 #include <pulse/xmalloc.h>
38 #include <pulse/proplist.h>
39
40 #include <pulsecore/esound.h>
41 #include <pulsecore/memblock.h>
42 #include <pulsecore/client.h>
43 #include <pulsecore/sink-input.h>
44 #include <pulsecore/sink.h>
45 #include <pulsecore/source-output.h>
46 #include <pulsecore/source.h>
47 #include <pulsecore/core-scache.h>
48 #include <pulsecore/sample-util.h>
49 #include <pulsecore/authkey.h>
50 #include <pulsecore/namereg.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/core-util.h>
53 #include <pulsecore/core-error.h>
54 #include <pulsecore/ipacl.h>
55 #include <pulsecore/macro.h>
56 #include <pulsecore/thread-mq.h>
57 #include <pulsecore/shared.h>
58
59 #include "endianmacros.h"
60
61 #include "protocol-esound.h"
62
63 /* Don't accept more connection than this */
64 #define MAX_CONNECTIONS 64
65
66 /* Kick a client if it doesn't authenticate within this time */
67 #define AUTH_TIMEOUT (5*PA_USEC_PER_SEC)
68
69 #define DEFAULT_COOKIE_FILE ".esd_auth"
70
71 #define PLAYBACK_BUFFER_SECONDS (.25)
72 #define PLAYBACK_BUFFER_FRAGMENTS (10)
73 #define RECORD_BUFFER_SECONDS (5)
74
75 #define MAX_CACHE_SAMPLE_SIZE (2048000)
76
77 #define DEFAULT_SINK_LATENCY (150*PA_USEC_PER_MSEC)
78 #define DEFAULT_SOURCE_LATENCY (150*PA_USEC_PER_MSEC)
79
80 #define SCACHE_PREFIX "esound."
81
82 /* This is heavily based on esound's code */
83
84 typedef struct connection {
85     pa_msgobject parent;
86
87     uint32_t index;
88     pa_bool_t dead;
89     pa_esound_protocol *protocol;
90     pa_esound_options *options;
91     pa_iochannel *io;
92     pa_client *client;
93     pa_bool_t authorized, swap_byte_order;
94     void *write_data;
95     size_t write_data_alloc, write_data_index, write_data_length;
96     void *read_data;
97     size_t read_data_alloc, read_data_length;
98     esd_proto_t request;
99     esd_client_state_t state;
100     pa_sink_input *sink_input;
101     pa_source_output *source_output;
102     pa_memblockq *input_memblockq, *output_memblockq;
103     pa_defer_event *defer_event;
104
105     char *original_name;
106
107     struct {
108         pa_memblock *current_memblock;
109         size_t memblock_index;
110         pa_atomic_t missing;
111         pa_bool_t underrun;
112     } playback;
113
114     struct {
115         pa_memchunk memchunk;
116         char *name;
117         pa_sample_spec sample_spec;
118     } scache;
119
120     pa_time_event *auth_timeout_event;
121 } connection;
122
123 PA_DECLARE_CLASS(connection);
124 #define CONNECTION(o) (connection_cast(o))
125 static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);
126
127 struct pa_esound_protocol {
128     PA_REFCNT_DECLARE;
129
130     pa_core *core;
131     pa_idxset *connections;
132     unsigned n_player;
133 };
134
135 enum {
136     SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
137     SINK_INPUT_MESSAGE_DISABLE_PREBUF
138 };
139
140 enum {
141     CONNECTION_MESSAGE_REQUEST_DATA,
142     CONNECTION_MESSAGE_POST_DATA,
143     CONNECTION_MESSAGE_UNLINK_CONNECTION
144 };
145
146 typedef struct proto_handler {
147     size_t data_length;
148     int (*proc)(connection *c, esd_proto_t request, const void *data, size_t length);
149     const char *description;
150 } esd_proto_handler_info_t;
151
152 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk);
153 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes);
154 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes);
155 static void sink_input_kill_cb(pa_sink_input *i);
156 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
157 static pa_usec_t source_output_get_latency_cb(pa_source_output *o);
158
159 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);
160 static void source_output_kill_cb(pa_source_output *o);
161
162 static int esd_proto_connect(connection *c, esd_proto_t request, const void *data, size_t length);
163 static int esd_proto_stream_play(connection *c, esd_proto_t request, const void *data, size_t length);
164 static int esd_proto_stream_record(connection *c, esd_proto_t request, const void *data, size_t length);
165 static int esd_proto_get_latency(connection *c, esd_proto_t request, const void *data, size_t length);
166 static int esd_proto_server_info(connection *c, esd_proto_t request, const void *data, size_t length);
167 static int esd_proto_all_info(connection *c, esd_proto_t request, const void *data, size_t length);
168 static int esd_proto_stream_pan(connection *c, esd_proto_t request, const void *data, size_t length);
169 static int esd_proto_sample_pan(connection *c, esd_proto_t request, const void *data, size_t length);
170 static int esd_proto_sample_cache(connection *c, esd_proto_t request, const void *data, size_t length);
171 static int esd_proto_sample_free_or_play(connection *c, esd_proto_t request, const void *data, size_t length);
172 static int esd_proto_sample_get_id(connection *c, esd_proto_t request, const void *data, size_t length);
173 static int esd_proto_standby_or_resume(connection *c, esd_proto_t request, const void *data, size_t length);
174 static int esd_proto_standby_mode(connection *c, esd_proto_t request, const void *data, size_t length);
175
176 /* the big map of protocol handler info */
177 static struct proto_handler proto_map[ESD_PROTO_MAX] = {
178     { ESD_KEY_LEN + sizeof(int),      esd_proto_connect, "connect" },
179     { ESD_KEY_LEN + sizeof(int),      NULL, "lock" },
180     { ESD_KEY_LEN + sizeof(int),      NULL, "unlock" },
181
182     { ESD_NAME_MAX + 2 * sizeof(int), esd_proto_stream_play, "stream play" },
183     { ESD_NAME_MAX + 2 * sizeof(int), esd_proto_stream_record, "stream rec" },
184     { ESD_NAME_MAX + 2 * sizeof(int), esd_proto_stream_record, "stream mon" },
185
186     { ESD_NAME_MAX + 3 * sizeof(int), esd_proto_sample_cache, "sample cache" },                      /* 6 */
187     { sizeof(int),                    esd_proto_sample_free_or_play, "sample free" },
188     { sizeof(int),                    esd_proto_sample_free_or_play, "sample play" },                /* 8 */
189     { sizeof(int),                    NULL, "sample loop" },
190     { sizeof(int),                    NULL, "sample stop" },
191     { (size_t) -1,                    NULL, "TODO: sample kill" },
192
193     { ESD_KEY_LEN + sizeof(int),      esd_proto_standby_or_resume, "standby" },
194     { ESD_KEY_LEN + sizeof(int),      esd_proto_standby_or_resume, "resume" },                       /* 13 */
195
196     { ESD_NAME_MAX,                   esd_proto_sample_get_id, "sample getid" },                     /* 14 */
197     { ESD_NAME_MAX + 2 * sizeof(int), NULL, "stream filter" },
198
199     { sizeof(int),                    esd_proto_server_info, "server info" },
200     { sizeof(int),                    esd_proto_all_info, "all info" },
201     { (size_t) -1,                    NULL, "TODO: subscribe" },
202     { (size_t) -1,                    NULL, "TODO: unsubscribe" },
203
204     { 3 * sizeof(int),                esd_proto_stream_pan, "stream pan"},
205     { 3 * sizeof(int),                esd_proto_sample_pan, "sample pan" },
206
207     { sizeof(int),                    esd_proto_standby_mode, "standby mode" },
208     { 0,                              esd_proto_get_latency, "get latency" }
209 };
210
211 static void connection_unlink(connection *c) {
212     pa_assert(c);
213
214     if (!c->protocol)
215         return;
216
217     if (c->options) {
218         pa_esound_options_unref(c->options);
219         c->options = NULL;
220     }
221
222     if (c->sink_input) {
223         pa_sink_input_unlink(c->sink_input);
224         pa_sink_input_unref(c->sink_input);
225         c->sink_input = NULL;
226     }
227
228     if (c->source_output) {
229         pa_source_output_unlink(c->source_output);
230         pa_source_output_unref(c->source_output);
231         c->source_output = NULL;
232     }
233
234     if (c->client) {
235         pa_client_free(c->client);
236         c->client = NULL;
237     }
238
239     if (c->state == ESD_STREAMING_DATA)
240         c->protocol->n_player--;
241
242     if (c->io) {
243         pa_iochannel_free(c->io);
244         c->io = NULL;
245     }
246
247     if (c->defer_event) {
248         c->protocol->core->mainloop->defer_free(c->defer_event);
249         c->defer_event = NULL;
250     }
251
252     if (c->auth_timeout_event) {
253         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
254         c->auth_timeout_event = NULL;
255     }
256
257     pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
258     c->protocol = NULL;
259     connection_unref(c);
260 }
261
262 static void connection_free(pa_object *obj) {
263     connection *c = CONNECTION(obj);
264     pa_assert(c);
265
266     if (c->input_memblockq)
267         pa_memblockq_free(c->input_memblockq);
268     if (c->output_memblockq)
269         pa_memblockq_free(c->output_memblockq);
270
271     if (c->playback.current_memblock)
272         pa_memblock_unref(c->playback.current_memblock);
273
274     pa_xfree(c->read_data);
275     pa_xfree(c->write_data);
276
277     if (c->scache.memchunk.memblock)
278         pa_memblock_unref(c->scache.memchunk.memblock);
279     pa_xfree(c->scache.name);
280
281     pa_xfree(c->original_name);
282     pa_xfree(c);
283 }
284
285 static void connection_write_prepare(connection *c, size_t length) {
286     size_t t;
287     pa_assert(c);
288
289     t = c->write_data_length+length;
290
291     if (c->write_data_alloc < t)
292         c->write_data = pa_xrealloc(c->write_data, c->write_data_alloc = t);
293
294     pa_assert(c->write_data);
295 }
296
297 static void connection_write(connection *c, const void *data, size_t length) {
298     size_t i;
299     pa_assert(c);
300
301     c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
302
303     connection_write_prepare(c, length);
304
305     pa_assert(c->write_data);
306
307     i = c->write_data_length;
308     c->write_data_length += length;
309
310     memcpy((uint8_t*) c->write_data + i, data, length);
311 }
312
313 static void format_esd2native(int format, pa_bool_t swap_bytes, pa_sample_spec *ss) {
314     pa_assert(ss);
315
316     ss->channels = (uint8_t) (((format & ESD_MASK_CHAN) == ESD_STEREO) ? 2 : 1);
317     if ((format & ESD_MASK_BITS) == ESD_BITS16)
318         ss->format = swap_bytes ? PA_SAMPLE_S16RE : PA_SAMPLE_S16NE;
319     else
320         ss->format = PA_SAMPLE_U8;
321 }
322
323 static int format_native2esd(pa_sample_spec *ss) {
324     int format = 0;
325
326     format = (ss->format == PA_SAMPLE_U8) ? ESD_BITS8 : ESD_BITS16;
327     format |= (ss->channels >= 2) ? ESD_STEREO : ESD_MONO;
328
329     return format;
330 }
331
332 #define CHECK_VALIDITY(expression, ...) do { \
333     if (!(expression)) { \
334         pa_log_warn(__FILE__ ": " __VA_ARGS__); \
335         return -1; \
336     } \
337 } while(0);
338
339 /*** esound commands ***/
340
341 static int esd_proto_connect(connection *c, esd_proto_t request, const void *data, size_t length) {
342     uint32_t ekey;
343     int ok;
344
345     connection_assert_ref(c);
346     pa_assert(data);
347     pa_assert(length == (ESD_KEY_LEN + sizeof(uint32_t)));
348
349     if (!c->authorized && c->options->auth_cookie) {
350         const uint8_t*key;
351
352         if ((key = pa_auth_cookie_read(c->options->auth_cookie, ESD_KEY_LEN)))
353             if (memcmp(data, key, ESD_KEY_LEN) == 0)
354                 c->authorized = TRUE;
355     }
356
357     if (!c->authorized) {
358         pa_log("Kicked client with invalid authorization key.");
359         return -1;
360     }
361
362     if (c->auth_timeout_event) {
363         c->protocol->core->mainloop->time_free(c->auth_timeout_event);
364         c->auth_timeout_event = NULL;
365     }
366
367     data = (const char*)data + ESD_KEY_LEN;
368
369     memcpy(&ekey, data, sizeof(uint32_t));
370     if (ekey == ESD_ENDIAN_KEY)
371         c->swap_byte_order = FALSE;
372     else if (ekey == ESD_SWAP_ENDIAN_KEY)
373         c->swap_byte_order = TRUE;
374     else {
375         pa_log_warn("Client sent invalid endian key");
376         return -1;
377     }
378
379     pa_proplist_sets(c->client->proplist, "esound.byte_order", c->swap_byte_order ? "reverse" : "native");
380
381     ok = 1;
382     connection_write(c, &ok, sizeof(int));
383     return 0;
384 }
385
386 static int esd_proto_stream_play(connection *c, esd_proto_t request, const void *data, size_t length) {
387     char name[ESD_NAME_MAX], *utf8_name;
388     int32_t format, rate;
389     pa_sample_spec ss;
390     size_t l;
391     pa_sink *sink = NULL;
392     pa_sink_input_new_data sdata;
393
394     connection_assert_ref(c);
395     pa_assert(data);
396     pa_assert(length == (sizeof(int32_t)*2+ESD_NAME_MAX));
397
398     memcpy(&format, data, sizeof(int32_t));
399     format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
400     data = (const char*) data + sizeof(int32_t);
401
402     memcpy(&rate, data, sizeof(int32_t));
403     rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
404     data = (const char*) data + sizeof(int32_t);
405
406     ss.rate = (uint32_t) rate;
407     format_esd2native(format, c->swap_byte_order, &ss);
408
409     CHECK_VALIDITY(pa_sample_spec_valid(&ss), "Invalid sample specification");
410
411     if (c->options->default_sink) {
412         sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK);
413         CHECK_VALIDITY(sink, "No such sink: %s", pa_strnull(c->options->default_sink));
414     }
415
416     pa_strlcpy(name, data, sizeof(name));
417
418     utf8_name = pa_utf8_filter(name);
419     pa_client_set_name(c->client, utf8_name);
420     pa_xfree(utf8_name);
421
422     c->original_name = pa_xstrdup(name);
423
424     pa_assert(!c->sink_input && !c->input_memblockq);
425
426     pa_sink_input_new_data_init(&sdata);
427     sdata.driver = __FILE__;
428     sdata.module = c->options->module;
429     sdata.client = c->client;
430     sdata.sink = sink;
431     pa_sink_input_new_data_set_sample_spec(&sdata, &ss);
432
433     pa_sink_input_new(&c->sink_input, c->protocol->core, &sdata, 0);
434     pa_sink_input_new_data_done(&sdata);
435
436     CHECK_VALIDITY(c->sink_input, "Failed to create sink input.");
437
438     l = (size_t) ((double) pa_bytes_per_second(&ss)*PLAYBACK_BUFFER_SECONDS);
439     c->input_memblockq = pa_memblockq_new(
440             0,
441             l,
442             l,
443             pa_frame_size(&ss),
444             (size_t) -1,
445             l/PLAYBACK_BUFFER_FRAGMENTS,
446             0,
447             NULL);
448     pa_iochannel_socket_set_rcvbuf(c->io, l);
449
450     c->sink_input->parent.process_msg = sink_input_process_msg;
451     c->sink_input->pop = sink_input_pop_cb;
452     c->sink_input->process_rewind = sink_input_process_rewind_cb;
453     c->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
454     c->sink_input->kill = sink_input_kill_cb;
455     c->sink_input->userdata = c;
456
457     pa_sink_input_set_requested_latency(c->sink_input, DEFAULT_SINK_LATENCY);
458
459     c->state = ESD_STREAMING_DATA;
460
461     c->protocol->n_player++;
462
463     pa_atomic_store(&c->playback.missing, (int) pa_memblockq_missing(c->input_memblockq));
464
465     pa_sink_input_put(c->sink_input);
466
467     return 0;
468 }
469
470 static int esd_proto_stream_record(connection *c, esd_proto_t request, const void *data, size_t length) {
471     char name[ESD_NAME_MAX], *utf8_name;
472     int32_t format, rate;
473     pa_source *source = NULL;
474     pa_sample_spec ss;
475     size_t l;
476     pa_source_output_new_data sdata;
477
478     connection_assert_ref(c);
479     pa_assert(data);
480     pa_assert(length == (sizeof(int32_t)*2+ESD_NAME_MAX));
481
482     memcpy(&format, data, sizeof(int32_t));
483     format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
484     data = (const char*) data + sizeof(int32_t);
485
486     memcpy(&rate, data, sizeof(int32_t));
487     rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
488     data = (const char*) data + sizeof(int32_t);
489
490     ss.rate = (uint32_t) rate;
491     format_esd2native(format, c->swap_byte_order, &ss);
492
493     CHECK_VALIDITY(pa_sample_spec_valid(&ss), "Invalid sample specification.");
494
495     if (request == ESD_PROTO_STREAM_MON) {
496         pa_sink* sink;
497
498         sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK);
499         CHECK_VALIDITY(sink, "No such sink: %s", pa_strnull(c->options->default_sink));
500
501         source = sink->monitor_source;
502         CHECK_VALIDITY(source, "No such source.");
503     } else {
504         pa_assert(request == ESD_PROTO_STREAM_REC);
505
506         if (c->options->default_source) {
507             source = pa_namereg_get(c->protocol->core, c->options->default_source, PA_NAMEREG_SOURCE);
508             CHECK_VALIDITY(source, "No such source: %s", pa_strnull(c->options->default_source));
509         }
510     }
511
512     pa_strlcpy(name, data, sizeof(name));
513
514     utf8_name = pa_utf8_filter(name);
515     pa_client_set_name(c->client, utf8_name);
516     pa_xfree(utf8_name);
517
518     c->original_name = pa_xstrdup(name);
519
520     pa_assert(!c->output_memblockq && !c->source_output);
521
522     pa_source_output_new_data_init(&sdata);
523     sdata.driver = __FILE__;
524     sdata.module = c->options->module;
525     sdata.client = c->client;
526     sdata.source = source;
527     pa_source_output_new_data_set_sample_spec(&sdata, &ss);
528
529     pa_source_output_new(&c->source_output, c->protocol->core, &sdata, 0);
530     pa_source_output_new_data_done(&sdata);
531
532     CHECK_VALIDITY(c->source_output, "Failed to create source output.");
533
534     l = (size_t) (pa_bytes_per_second(&ss)*RECORD_BUFFER_SECONDS);
535     c->output_memblockq = pa_memblockq_new(
536             0,
537             l,
538             l,
539             pa_frame_size(&ss),
540             1,
541             0,
542             0,
543             NULL);
544     pa_iochannel_socket_set_sndbuf(c->io, l);
545
546     c->source_output->push = source_output_push_cb;
547     c->source_output->kill = source_output_kill_cb;
548     c->source_output->get_latency = source_output_get_latency_cb;
549     c->source_output->userdata = c;
550
551     pa_source_output_set_requested_latency(c->source_output, DEFAULT_SOURCE_LATENCY);
552
553     c->state = ESD_STREAMING_DATA;
554
555     c->protocol->n_player++;
556
557     pa_source_output_put(c->source_output);
558
559     return 0;
560 }
561
562 static int esd_proto_get_latency(connection *c, esd_proto_t request, const void *data, size_t length) {
563     pa_sink *sink;
564     int32_t latency;
565
566     connection_assert_ref(c);
567     pa_assert(!data);
568     pa_assert(length == 0);
569
570     if (!(sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK)))
571         latency = 0;
572     else {
573         double usec = (double) pa_sink_get_requested_latency(sink);
574         latency = (int) ((usec*44100)/1000000);
575     }
576
577     latency = PA_MAYBE_INT32_SWAP(c->swap_byte_order, latency);
578     connection_write(c, &latency, sizeof(int32_t));
579
580     return 0;
581 }
582
583 static int esd_proto_server_info(connection *c, esd_proto_t request, const void *data, size_t length) {
584     int32_t rate = 44100, format = ESD_STEREO|ESD_BITS16;
585     int32_t response;
586     pa_sink *sink;
587
588     connection_assert_ref(c);
589     pa_assert(data);
590     pa_assert(length == sizeof(int32_t));
591
592     if ((sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK))) {
593         rate = (int32_t) sink->sample_spec.rate;
594         format = format_native2esd(&sink->sample_spec);
595     }
596
597     connection_write_prepare(c, sizeof(int32_t) * 3);
598
599     response = 0;
600     connection_write(c, &response, sizeof(int32_t));
601     rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
602     connection_write(c, &rate, sizeof(int32_t));
603     format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
604     connection_write(c, &format, sizeof(int32_t));
605
606     return 0;
607 }
608
609 static int esd_proto_all_info(connection *c, esd_proto_t request, const void *data, size_t length) {
610     size_t t, k, s;
611     connection *conn;
612     uint32_t idx = PA_IDXSET_INVALID;
613     unsigned nsamples;
614     char terminator[sizeof(int32_t)*6+ESD_NAME_MAX];
615
616     connection_assert_ref(c);
617     pa_assert(data);
618     pa_assert(length == sizeof(int32_t));
619
620     if (esd_proto_server_info(c, request, data, length) < 0)
621         return -1;
622
623     k = sizeof(int32_t)*5+ESD_NAME_MAX;
624     s = sizeof(int32_t)*6+ESD_NAME_MAX;
625     nsamples = pa_idxset_size(c->protocol->core->scache);
626     t = s*(nsamples+1) + k*(c->protocol->n_player+1);
627
628     connection_write_prepare(c, t);
629
630     memset(terminator, 0, sizeof(terminator));
631
632     for (conn = pa_idxset_first(c->protocol->connections, &idx); conn; conn = pa_idxset_next(c->protocol->connections, &idx)) {
633         int32_t id, format = ESD_BITS16 | ESD_STEREO, rate = 44100, lvolume = ESD_VOLUME_BASE, rvolume = ESD_VOLUME_BASE;
634         char name[ESD_NAME_MAX];
635
636         if (conn->state != ESD_STREAMING_DATA)
637             continue;
638
639         pa_assert(t >= k*2+s);
640
641         if (conn->sink_input) {
642             pa_cvolume volume;
643             pa_sink_input_get_volume(conn->sink_input, &volume, TRUE);
644             rate = (int32_t) conn->sink_input->sample_spec.rate;
645             lvolume = (int32_t) ((volume.values[0]*ESD_VOLUME_BASE)/PA_VOLUME_NORM);
646             rvolume = (int32_t) ((volume.values[volume.channels == 2 ? 1 : 0]*ESD_VOLUME_BASE)/PA_VOLUME_NORM);
647             format = format_native2esd(&conn->sink_input->sample_spec);
648         }
649
650         /* id */
651         id = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int32_t) (conn->index+1));
652         connection_write(c, &id, sizeof(int32_t));
653
654         /* name */
655         memset(name, 0, ESD_NAME_MAX); /* don't leak old data */
656         if (conn->original_name)
657             strncpy(name, conn->original_name, ESD_NAME_MAX);
658         else if (conn->client && pa_proplist_gets(conn->client->proplist, PA_PROP_APPLICATION_NAME))
659             strncpy(name, pa_proplist_gets(conn->client->proplist, PA_PROP_APPLICATION_NAME), ESD_NAME_MAX);
660         connection_write(c, name, ESD_NAME_MAX);
661
662         /* rate */
663         rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
664         connection_write(c, &rate, sizeof(int32_t));
665
666         /* left */
667         lvolume = PA_MAYBE_INT32_SWAP(c->swap_byte_order, lvolume);
668         connection_write(c, &lvolume, sizeof(int32_t));
669
670         /*right*/
671         rvolume = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rvolume);
672         connection_write(c, &rvolume, sizeof(int32_t));
673
674         /*format*/
675         format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
676         connection_write(c, &format, sizeof(int32_t));
677
678         t -= k;
679     }
680
681     pa_assert(t == s*(nsamples+1)+k);
682     t -= k;
683
684     connection_write(c, terminator, k);
685
686     if (nsamples) {
687         pa_scache_entry *ce;
688
689         idx = PA_IDXSET_INVALID;
690         for (ce = pa_idxset_first(c->protocol->core->scache, &idx); ce; ce = pa_idxset_next(c->protocol->core->scache, &idx)) {
691             int32_t id, rate, lvolume, rvolume, format, len;
692             char name[ESD_NAME_MAX];
693             pa_channel_map stereo = { .channels = 2, .map = { PA_CHANNEL_POSITION_LEFT, PA_CHANNEL_POSITION_RIGHT } };
694             pa_cvolume volume;
695             pa_sample_spec ss;
696
697             pa_assert(t >= s*2);
698
699             if (ce->volume_is_set) {
700                 volume = ce->volume;
701                 pa_cvolume_remap(&volume, &ce->channel_map, &stereo);
702             } else
703                 pa_cvolume_reset(&volume, 2);
704
705             if (ce->memchunk.memblock)
706                 ss = ce->sample_spec;
707             else {
708                 ss.format = PA_SAMPLE_S16NE;
709                 ss.rate = 44100;
710                 ss.channels = 2;
711             }
712
713             /* id */
714             id = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int) (ce->index+1));
715             connection_write(c, &id, sizeof(int32_t));
716
717             /* name */
718             memset(name, 0, ESD_NAME_MAX); /* don't leak old data */
719             if (strncmp(ce->name, SCACHE_PREFIX, sizeof(SCACHE_PREFIX)-1) == 0)
720                 strncpy(name, ce->name+sizeof(SCACHE_PREFIX)-1, ESD_NAME_MAX);
721             else
722                 pa_snprintf(name, ESD_NAME_MAX, "native.%s", ce->name);
723             connection_write(c, name, ESD_NAME_MAX);
724
725             /* rate */
726             rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int32_t) ss.rate);
727             connection_write(c, &rate, sizeof(int32_t));
728
729             /* left */
730             lvolume = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int32_t) ((volume.values[0]*ESD_VOLUME_BASE)/PA_VOLUME_NORM));
731             connection_write(c, &lvolume, sizeof(int32_t));
732
733             /*right*/
734             rvolume = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int32_t) ((volume.values[1]*ESD_VOLUME_BASE)/PA_VOLUME_NORM));
735             connection_write(c, &rvolume, sizeof(int32_t));
736
737             /*format*/
738             format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format_native2esd(&ss));
739             connection_write(c, &format, sizeof(int32_t));
740
741             /*length*/
742             len = PA_MAYBE_INT32_SWAP(c->swap_byte_order, (int) ce->memchunk.length);
743             connection_write(c, &len, sizeof(int32_t));
744
745             t -= s;
746         }
747     }
748
749     pa_assert(t == s);
750
751     connection_write(c, terminator, s);
752
753     return 0;
754 }
755
756 static int esd_proto_stream_pan(connection *c, esd_proto_t request, const void *data, size_t length) {
757     int32_t ok;
758     uint32_t idx, lvolume, rvolume;
759     connection *conn;
760
761     connection_assert_ref(c);
762     pa_assert(data);
763     pa_assert(length == sizeof(int32_t)*3);
764
765     memcpy(&idx, data, sizeof(uint32_t));
766     idx = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1;
767     data = (const char*)data + sizeof(uint32_t);
768
769     memcpy(&lvolume, data, sizeof(uint32_t));
770     lvolume = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, lvolume);
771     data = (const char*)data + sizeof(uint32_t);
772
773     memcpy(&rvolume, data, sizeof(uint32_t));
774     rvolume = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, rvolume);
775     data = (const char*)data + sizeof(uint32_t);
776
777     if ((conn = pa_idxset_get_by_index(c->protocol->connections, idx)) && conn->sink_input) {
778         pa_cvolume volume;
779         volume.values[0] = (lvolume*PA_VOLUME_NORM)/ESD_VOLUME_BASE;
780         volume.values[1] = (rvolume*PA_VOLUME_NORM)/ESD_VOLUME_BASE;
781         volume.channels = conn->sink_input->sample_spec.channels;
782
783         pa_sink_input_set_volume(conn->sink_input, &volume, TRUE, TRUE);
784         ok = 1;
785     } else
786         ok = 0;
787
788     connection_write(c, &ok, sizeof(int32_t));
789
790     return 0;
791 }
792
793 static int esd_proto_sample_pan(connection *c, esd_proto_t request, const void *data, size_t length) {
794     int32_t ok = 0;
795     uint32_t idx, lvolume, rvolume;
796     pa_cvolume volume;
797     pa_scache_entry *ce;
798
799     connection_assert_ref(c);
800     pa_assert(data);
801     pa_assert(length == sizeof(int32_t)*3);
802
803     memcpy(&idx, data, sizeof(uint32_t));
804     idx = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1;
805     data = (const char*)data + sizeof(uint32_t);
806
807     memcpy(&lvolume, data, sizeof(uint32_t));
808     lvolume = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, lvolume);
809     data = (const char*)data + sizeof(uint32_t);
810
811     memcpy(&rvolume, data, sizeof(uint32_t));
812     rvolume = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, rvolume);
813     data = (const char*)data + sizeof(uint32_t);
814
815     volume.values[0] = (lvolume*PA_VOLUME_NORM)/ESD_VOLUME_BASE;
816     volume.values[1] = (rvolume*PA_VOLUME_NORM)/ESD_VOLUME_BASE;
817     volume.channels = 2;
818
819     if ((ce = pa_idxset_get_by_index(c->protocol->core->scache, idx))) {
820         pa_channel_map stereo = { .channels = 2, .map = { PA_CHANNEL_POSITION_LEFT, PA_CHANNEL_POSITION_RIGHT } };
821
822         pa_cvolume_remap(&volume, &stereo, &ce->channel_map);
823         ce->volume = volume;
824         ce->volume_is_set = TRUE;
825         ok = 1;
826     }
827
828     connection_write(c, &ok, sizeof(int32_t));
829
830     return 0;
831 }
832
833 static int esd_proto_sample_cache(connection *c, esd_proto_t request, const void *data, size_t length) {
834     pa_sample_spec ss;
835     int32_t format, rate, sc_length;
836     uint32_t idx;
837     char name[ESD_NAME_MAX+sizeof(SCACHE_PREFIX)-1];
838
839     connection_assert_ref(c);
840     pa_assert(data);
841     pa_assert(length == (ESD_NAME_MAX+3*sizeof(int32_t)));
842
843     memcpy(&format, data, sizeof(int32_t));
844     format = PA_MAYBE_INT32_SWAP(c->swap_byte_order, format);
845     data = (const char*)data + sizeof(int32_t);
846
847     memcpy(&rate, data, sizeof(int32_t));
848     rate = PA_MAYBE_INT32_SWAP(c->swap_byte_order, rate);
849     data = (const char*)data + sizeof(int32_t);
850
851     ss.rate = (uint32_t) rate;
852     format_esd2native(format, c->swap_byte_order, &ss);
853
854     CHECK_VALIDITY(pa_sample_spec_valid(&ss), "Invalid sample specification.");
855
856     memcpy(&sc_length, data, sizeof(int32_t));
857     sc_length = PA_MAYBE_INT32_SWAP(c->swap_byte_order, sc_length);
858     data = (const char*)data + sizeof(int32_t);
859
860     CHECK_VALIDITY(sc_length <= MAX_CACHE_SAMPLE_SIZE, "Sample too large (%d bytes).", (int)sc_length);
861
862     strcpy(name, SCACHE_PREFIX);
863     pa_strlcpy(name+sizeof(SCACHE_PREFIX)-1, data, ESD_NAME_MAX);
864
865     CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name.");
866
867     pa_assert(!c->scache.memchunk.memblock);
868     c->scache.memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, (size_t) sc_length);
869     c->scache.memchunk.index = 0;
870     c->scache.memchunk.length = (size_t) sc_length;
871     c->scache.sample_spec = ss;
872     pa_assert(!c->scache.name);
873     c->scache.name = pa_xstrdup(name);
874
875     c->state = ESD_CACHING_SAMPLE;
876
877     pa_scache_add_item(c->protocol->core, c->scache.name, NULL, NULL, NULL, c->client->proplist, &idx);
878
879     idx += 1;
880     connection_write(c, &idx, sizeof(uint32_t));
881
882     return 0;
883 }
884
885 static int esd_proto_sample_get_id(connection *c, esd_proto_t request, const void *data, size_t length) {
886     int32_t ok;
887     uint32_t idx;
888     char name[ESD_NAME_MAX+sizeof(SCACHE_PREFIX)-1];
889
890     connection_assert_ref(c);
891     pa_assert(data);
892     pa_assert(length == ESD_NAME_MAX);
893
894     strcpy(name, SCACHE_PREFIX);
895     pa_strlcpy(name+sizeof(SCACHE_PREFIX)-1, data, ESD_NAME_MAX);
896
897     CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name.");
898
899     ok = -1;
900     if ((idx = pa_scache_get_id_by_name(c->protocol->core, name)) != PA_IDXSET_INVALID)
901         ok = (int32_t) idx + 1;
902
903     connection_write(c, &ok, sizeof(int32_t));
904
905     return 0;
906 }
907
908 static int esd_proto_sample_free_or_play(connection *c, esd_proto_t request, const void *data, size_t length) {
909     int32_t ok;
910     const char *name;
911     uint32_t idx;
912
913     connection_assert_ref(c);
914     pa_assert(data);
915     pa_assert(length == sizeof(int32_t));
916
917     memcpy(&idx, data, sizeof(uint32_t));
918     idx = PA_MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1;
919
920     ok = 0;
921
922     if ((name = pa_scache_get_name_by_id(c->protocol->core, idx))) {
923         if (request == ESD_PROTO_SAMPLE_PLAY) {
924             pa_sink *sink;
925
926             if ((sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK)))
927                 if (pa_scache_play_item(c->protocol->core, name, sink, PA_VOLUME_NORM, c->client->proplist, NULL) >= 0)
928                     ok = (int32_t) idx + 1;
929         } else {
930             pa_assert(request == ESD_PROTO_SAMPLE_FREE);
931
932             if (pa_scache_remove_item(c->protocol->core, name) >= 0)
933                 ok = (int32_t) idx + 1;
934         }
935     }
936
937     connection_write(c, &ok, sizeof(int32_t));
938
939     return 0;
940 }
941
942 static int esd_proto_standby_or_resume(connection *c, esd_proto_t request, const void *data, size_t length) {
943     int32_t ok = 1;
944
945     connection_assert_ref(c);
946
947     connection_write_prepare(c, sizeof(int32_t) * 2);
948     connection_write(c, &ok, sizeof(int32_t));
949
950     if (request == ESD_PROTO_STANDBY)
951         ok = pa_sink_suspend_all(c->protocol->core, TRUE, PA_SUSPEND_USER) >= 0;
952     else {
953         pa_assert(request == ESD_PROTO_RESUME);
954         ok = pa_sink_suspend_all(c->protocol->core, FALSE, PA_SUSPEND_USER) >= 0;
955     }
956
957     connection_write(c, &ok, sizeof(int32_t));
958
959     return 0;
960 }
961
962 static int esd_proto_standby_mode(connection *c, esd_proto_t request, const void *data, size_t length) {
963     int32_t mode;
964     pa_sink *sink, *source;
965
966     connection_assert_ref(c);
967
968     mode = ESM_RUNNING;
969
970     if ((sink = pa_namereg_get(c->protocol->core, c->options->default_sink, PA_NAMEREG_SINK)))
971         if (pa_sink_get_state(sink) == PA_SINK_SUSPENDED)
972             mode = ESM_ON_STANDBY;
973
974     if ((source = pa_namereg_get(c->protocol->core, c->options->default_source, PA_NAMEREG_SOURCE)))
975         if (pa_source_get_state(source) == PA_SOURCE_SUSPENDED)
976             mode = ESM_ON_STANDBY;
977
978     mode = PA_MAYBE_INT32_SWAP(c->swap_byte_order, mode);
979
980     connection_write(c, &mode, sizeof(mode));
981     return 0;
982 }
983
984 /*** client callbacks ***/
985
986 static void client_kill_cb(pa_client *c) {
987     pa_assert(c);
988
989     connection_unlink(CONNECTION(c->userdata));
990 }
991
992 /*** pa_iochannel callbacks ***/
993
994 static int do_read(connection *c) {
995     connection_assert_ref(c);
996
997 /*     pa_log("READ"); */
998
999     if (c->state == ESD_NEXT_REQUEST) {
1000         ssize_t r;
1001         pa_assert(c->read_data_length < sizeof(c->request));
1002
1003         if ((r = pa_iochannel_read(c->io,
1004                                    ((uint8_t*) &c->request) + c->read_data_length,
1005                                    sizeof(c->request) - c->read_data_length)) <= 0) {
1006
1007             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1008                 return 0;
1009
1010             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
1011             return -1;
1012         }
1013
1014         c->read_data_length += (size_t) r;
1015
1016         if (c->read_data_length >= sizeof(c->request)) {
1017             struct proto_handler *handler;
1018
1019             c->request = PA_MAYBE_INT32_SWAP(c->swap_byte_order, c->request);
1020
1021             if (c->request < ESD_PROTO_CONNECT || c->request >= ESD_PROTO_MAX) {
1022                 pa_log("recieved invalid request.");
1023                 return -1;
1024             }
1025
1026             handler = proto_map+c->request;
1027
1028 /*             pa_log("executing request #%u", c->request); */
1029
1030             if (!handler->proc) {
1031                 pa_log("recieved unimplemented request #%u.", c->request);
1032                 return -1;
1033             }
1034
1035             if (handler->data_length == 0) {
1036                 c->read_data_length = 0;
1037
1038                 if (handler->proc(c, c->request, NULL, 0) < 0)
1039                     return -1;
1040
1041             } else {
1042                 if (c->read_data_alloc < handler->data_length)
1043                     c->read_data = pa_xrealloc(c->read_data, c->read_data_alloc = handler->data_length);
1044                 pa_assert(c->read_data);
1045
1046                 c->state = ESD_NEEDS_REQDATA;
1047                 c->read_data_length = 0;
1048             }
1049         }
1050
1051     } else if (c->state == ESD_NEEDS_REQDATA) {
1052         ssize_t r;
1053         struct proto_handler *handler = proto_map+c->request;
1054
1055         pa_assert(handler->proc);
1056
1057         pa_assert(c->read_data && c->read_data_length < handler->data_length);
1058
1059         if ((r = pa_iochannel_read(c->io,
1060                                    (uint8_t*) c->read_data + c->read_data_length,
1061                                    handler->data_length - c->read_data_length)) <= 0) {
1062
1063             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1064                 return 0;
1065
1066             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
1067             return -1;
1068         }
1069
1070         c->read_data_length += (size_t) r;
1071         if (c->read_data_length >= handler->data_length) {
1072             size_t l = c->read_data_length;
1073             pa_assert(handler->proc);
1074
1075             c->state = ESD_NEXT_REQUEST;
1076             c->read_data_length = 0;
1077
1078             if (handler->proc(c, c->request, c->read_data, l) < 0)
1079                 return -1;
1080         }
1081     } else if (c->state == ESD_CACHING_SAMPLE) {
1082         ssize_t r;
1083         void *p;
1084
1085         pa_assert(c->scache.memchunk.memblock);
1086         pa_assert(c->scache.name);
1087         pa_assert(c->scache.memchunk.index < c->scache.memchunk.length);
1088
1089         p = pa_memblock_acquire(c->scache.memchunk.memblock);
1090         r = pa_iochannel_read(c->io, (uint8_t*) p+c->scache.memchunk.index, c->scache.memchunk.length-c->scache.memchunk.index);
1091         pa_memblock_release(c->scache.memchunk.memblock);
1092
1093         if (r <= 0) {
1094             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1095                 return 0;
1096
1097             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
1098             return -1;
1099         }
1100
1101         c->scache.memchunk.index += (size_t) r;
1102         pa_assert(c->scache.memchunk.index <= c->scache.memchunk.length);
1103
1104         if (c->scache.memchunk.index == c->scache.memchunk.length) {
1105             uint32_t idx;
1106
1107             c->scache.memchunk.index = 0;
1108             pa_scache_add_item(c->protocol->core, c->scache.name, &c->scache.sample_spec, NULL, &c->scache.memchunk, c->client->proplist, &idx);
1109
1110             pa_memblock_unref(c->scache.memchunk.memblock);
1111             pa_memchunk_reset(&c->scache.memchunk);
1112
1113             pa_xfree(c->scache.name);
1114             c->scache.name = NULL;
1115
1116             c->state = ESD_NEXT_REQUEST;
1117
1118             idx += 1;
1119             connection_write(c, &idx, sizeof(uint32_t));
1120         }
1121
1122     } else if (c->state == ESD_STREAMING_DATA && c->sink_input) {
1123         pa_memchunk chunk;
1124         ssize_t r;
1125         size_t l;
1126         void *p;
1127         size_t space;
1128
1129         pa_assert(c->input_memblockq);
1130
1131 /*         pa_log("STREAMING_DATA"); */
1132
1133         if (!(l = (size_t) pa_atomic_load(&c->playback.missing)))
1134             return 0;
1135
1136         if (c->playback.current_memblock) {
1137
1138             space = pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index;
1139
1140             if (space <= 0) {
1141                 pa_memblock_unref(c->playback.current_memblock);
1142                 c->playback.current_memblock = NULL;
1143             }
1144         }
1145
1146         if (!c->playback.current_memblock) {
1147             pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, (size_t) -1));
1148             c->playback.memblock_index = 0;
1149
1150             space = pa_memblock_get_length(c->playback.current_memblock);
1151         }
1152
1153         if (l > space)
1154             l = space;
1155
1156         p = pa_memblock_acquire(c->playback.current_memblock);
1157         r = pa_iochannel_read(c->io, (uint8_t*) p+c->playback.memblock_index, l);
1158         pa_memblock_release(c->playback.current_memblock);
1159
1160         if (r <= 0) {
1161
1162             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1163                 return 0;
1164
1165             pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
1166             return -1;
1167         }
1168
1169         chunk.memblock = c->playback.current_memblock;
1170         chunk.index = c->playback.memblock_index;
1171         chunk.length = (size_t) r;
1172
1173         c->playback.memblock_index += (size_t) r;
1174
1175         pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL);
1176         pa_atomic_sub(&c->playback.missing, (int) r);
1177     }
1178
1179     return 0;
1180 }
1181
1182 static int do_write(connection *c) {
1183     connection_assert_ref(c);
1184
1185 /*     pa_log("WRITE"); */
1186
1187     if (c->write_data_length) {
1188         ssize_t r;
1189
1190         pa_assert(c->write_data_index < c->write_data_length);
1191         if ((r = pa_iochannel_write(c->io, (uint8_t*) c->write_data+c->write_data_index, c->write_data_length-c->write_data_index)) < 0) {
1192
1193             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1194                 return 0;
1195
1196             pa_log("write(): %s", pa_cstrerror(errno));
1197             return -1;
1198         }
1199
1200         c->write_data_index += (size_t) r;
1201         if (c->write_data_index >= c->write_data_length)
1202             c->write_data_length = c->write_data_index = 0;
1203
1204     } else if (c->state == ESD_STREAMING_DATA && c->source_output) {
1205         pa_memchunk chunk;
1206         ssize_t r;
1207         void *p;
1208
1209         if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)
1210             return 0;
1211
1212         pa_assert(chunk.memblock);
1213         pa_assert(chunk.length);
1214
1215         p = pa_memblock_acquire(chunk.memblock);
1216         r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length);
1217         pa_memblock_release(chunk.memblock);
1218
1219         pa_memblock_unref(chunk.memblock);
1220
1221         if (r < 0) {
1222
1223             if (r < 0 && (errno == EINTR || errno == EAGAIN))
1224                 return 0;
1225
1226             pa_log("write(): %s", pa_cstrerror(errno));
1227             return -1;
1228         }
1229
1230         pa_memblockq_drop(c->output_memblockq, (size_t) r);
1231     }
1232
1233     return 0;
1234 }
1235
1236 static void do_work(connection *c) {
1237     connection_assert_ref(c);
1238
1239     c->protocol->core->mainloop->defer_enable(c->defer_event, 0);
1240
1241     if (c->dead)
1242         return;
1243
1244     if (pa_iochannel_is_readable(c->io))
1245         if (do_read(c) < 0)
1246             goto fail;
1247
1248     if (c->state == ESD_STREAMING_DATA && !c->sink_input && pa_iochannel_is_hungup(c->io))
1249         /* In case we are in capture mode we will never call read()
1250          * on the socket, hence we need to detect the hangup manually
1251          * here, instead of simply waiting for read() to return 0. */
1252         goto fail;
1253
1254     if (pa_iochannel_is_writable(c->io))
1255         if (do_write(c) < 0)
1256             goto fail;
1257
1258     return;
1259
1260 fail:
1261
1262     if (c->state == ESD_STREAMING_DATA && c->sink_input) {
1263         c->dead = TRUE;
1264
1265         pa_iochannel_free(c->io);
1266         c->io = NULL;
1267
1268         pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL);
1269     } else
1270         connection_unlink(c);
1271 }
1272
1273 static void io_callback(pa_iochannel*io, void *userdata) {
1274     connection *c = CONNECTION(userdata);
1275
1276     connection_assert_ref(c);
1277     pa_assert(io);
1278
1279     do_work(c);
1280 }
1281
1282 static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *userdata) {
1283     connection *c = CONNECTION(userdata);
1284
1285     connection_assert_ref(c);
1286     pa_assert(e);
1287
1288     do_work(c);
1289 }
1290
1291 static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
1292     connection *c = CONNECTION(o);
1293     connection_assert_ref(c);
1294
1295     if (!c->protocol)
1296         return -1;
1297
1298     switch (code) {
1299         case CONNECTION_MESSAGE_REQUEST_DATA:
1300             do_work(c);
1301             break;
1302
1303         case CONNECTION_MESSAGE_POST_DATA:
1304 /*             pa_log("got data %u", chunk->length); */
1305             pa_memblockq_push_align(c->output_memblockq, chunk);
1306             do_work(c);
1307             break;
1308
1309         case CONNECTION_MESSAGE_UNLINK_CONNECTION:
1310             connection_unlink(c);
1311             break;
1312     }
1313
1314     return 0;
1315 }
1316
1317 /*** sink_input callbacks ***/
1318
1319 /* Called from thread context */
1320 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1321     pa_sink_input *i = PA_SINK_INPUT(o);
1322     connection*c;
1323
1324     pa_sink_input_assert_ref(i);
1325     c = CONNECTION(i->userdata);
1326     connection_assert_ref(c);
1327
1328     switch (code) {
1329
1330         case SINK_INPUT_MESSAGE_POST_DATA: {
1331             pa_assert(chunk);
1332
1333             /* New data from the main loop */
1334             pa_memblockq_push_align(c->input_memblockq, chunk);
1335
1336             if (pa_memblockq_is_readable(c->input_memblockq) && c->playback.underrun) {
1337                 pa_log_debug("Requesting rewind due to end of underrun.");
1338                 pa_sink_input_request_rewind(c->sink_input, 0, FALSE, TRUE, FALSE);
1339             }
1340
1341 /*             pa_log("got data, %u", pa_memblockq_get_length(c->input_memblockq)); */
1342
1343             return 0;
1344         }
1345
1346         case SINK_INPUT_MESSAGE_DISABLE_PREBUF:
1347             pa_memblockq_prebuf_disable(c->input_memblockq);
1348             return 0;
1349
1350         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
1351             pa_usec_t *r = userdata;
1352
1353             *r = pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
1354
1355             /* Fall through, the default handler will add in the extra
1356              * latency added by the resampler */
1357         }
1358
1359         default:
1360             return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
1361     }
1362 }
1363
1364 /* Called from thread context */
1365 static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {
1366     connection*c;
1367
1368     pa_sink_input_assert_ref(i);
1369     c = CONNECTION(i->userdata);
1370     connection_assert_ref(c);
1371     pa_assert(chunk);
1372
1373     if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) {
1374
1375         c->playback.underrun = TRUE;
1376
1377         if (c->dead && pa_sink_input_safe_to_remove(i))
1378             pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL);
1379
1380         return -1;
1381     } else {
1382         size_t m;
1383
1384         chunk->length = PA_MIN(length, chunk->length);
1385
1386         c->playback.underrun = FALSE;
1387
1388         pa_memblockq_drop(c->input_memblockq, chunk->length);
1389         m = pa_memblockq_pop_missing(c->input_memblockq);
1390
1391         if (m > 0)
1392             if (pa_atomic_add(&c->playback.missing, (int) m) <= 0)
1393                 pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
1394
1395         return 0;
1396     }
1397 }
1398
1399 /* Called from thread context */
1400 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1401     connection *c;
1402
1403     pa_sink_input_assert_ref(i);
1404     c = CONNECTION(i->userdata);
1405     connection_assert_ref(c);
1406
1407     /* If we are in an underrun, then we don't rewind */
1408     if (i->thread_info.underrun_for > 0)
1409         return;
1410
1411     pa_memblockq_rewind(c->input_memblockq, nbytes);
1412 }
1413
1414 /* Called from thread context */
1415 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1416     connection *c;
1417
1418     pa_sink_input_assert_ref(i);
1419     c = CONNECTION(i->userdata);
1420     connection_assert_ref(c);
1421
1422     pa_memblockq_set_maxrewind(c->input_memblockq, nbytes);
1423 }
1424
1425 static void sink_input_kill_cb(pa_sink_input *i) {
1426     pa_sink_input_assert_ref(i);
1427
1428     connection_unlink(CONNECTION(i->userdata));
1429 }
1430
1431 /*** source_output callbacks ***/
1432
1433 /* Called from thread context */
1434 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
1435     connection *c;
1436
1437     pa_source_output_assert_ref(o);
1438     c = CONNECTION(o->userdata);
1439     pa_assert(c);
1440     pa_assert(chunk);
1441
1442     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
1443 }
1444
1445 static void source_output_kill_cb(pa_source_output *o) {
1446     pa_source_output_assert_ref(o);
1447
1448     connection_unlink(CONNECTION(o->userdata));
1449 }
1450
1451 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
1452     connection*c;
1453
1454     pa_source_output_assert_ref(o);
1455     c = CONNECTION(o->userdata);
1456     pa_assert(c);
1457
1458     return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
1459 }
1460
1461 /*** entry points ***/
1462
1463 static void auth_timeout(pa_mainloop_api *m, pa_time_event *e, const struct timeval *t, void *userdata) {
1464     connection *c = CONNECTION(userdata);
1465
1466     pa_assert(m);
1467     connection_assert_ref(c);
1468     pa_assert(c->auth_timeout_event == e);
1469
1470     if (!c->authorized)
1471         connection_unlink(c);
1472 }
1473
1474 void pa_esound_protocol_connect(pa_esound_protocol *p, pa_iochannel *io, pa_esound_options *o) {
1475     connection *c;
1476     char pname[128];
1477     pa_client_new_data data;
1478     pa_client *client;
1479
1480     pa_assert(p);
1481     pa_assert(io);
1482     pa_assert(o);
1483
1484     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
1485         pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
1486         pa_iochannel_free(io);
1487         return;
1488     }
1489
1490     pa_client_new_data_init(&data);
1491     data.module = o->module;
1492     data.driver = __FILE__;
1493     pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));
1494     pa_proplist_setf(data.proplist, PA_PROP_APPLICATION_NAME, "EsounD client (%s)", pname);
1495     pa_proplist_sets(data.proplist, "esound-protocol.peer", pname);
1496     client = pa_client_new(p->core, &data);
1497     pa_client_new_data_done(&data);
1498
1499     if (!client)
1500         return;
1501
1502     c = pa_msgobject_new(connection);
1503     c->parent.parent.free = connection_free;
1504     c->parent.process_msg = connection_process_msg;
1505     c->protocol = p;
1506     c->io = io;
1507     pa_iochannel_set_callback(c->io, io_callback, c);
1508
1509     c->client = client;
1510     c->client->kill = client_kill_cb;
1511     c->client->userdata = c;
1512
1513     c->options = pa_esound_options_ref(o);
1514     c->authorized = FALSE;
1515     c->swap_byte_order = FALSE;
1516     c->dead = FALSE;
1517
1518     c->read_data_length = 0;
1519     c->read_data = pa_xmalloc(c->read_data_alloc = proto_map[ESD_PROTO_CONNECT].data_length);
1520
1521     c->write_data_length = c->write_data_index = c->write_data_alloc = 0;
1522     c->write_data = NULL;
1523
1524     c->state = ESD_NEEDS_REQDATA;
1525     c->request = ESD_PROTO_CONNECT;
1526
1527     c->sink_input = NULL;
1528     c->input_memblockq = NULL;
1529
1530     c->source_output = NULL;
1531     c->output_memblockq = NULL;
1532
1533     c->playback.current_memblock = NULL;
1534     c->playback.memblock_index = 0;
1535     c->playback.underrun = TRUE;
1536     pa_atomic_store(&c->playback.missing, 0);
1537
1538     pa_memchunk_reset(&c->scache.memchunk);
1539     c->scache.name = NULL;
1540
1541     c->original_name = NULL;
1542
1543     if (o->auth_anonymous) {
1544         pa_log_info("Client authenticated anonymously.");
1545         c->authorized = TRUE;
1546     }
1547
1548     if (!c->authorized &&
1549         o->auth_ip_acl &&
1550         pa_ip_acl_check(o->auth_ip_acl, pa_iochannel_get_recv_fd(io)) > 0) {
1551
1552         pa_log_info("Client authenticated by IP ACL.");
1553         c->authorized = TRUE;
1554     }
1555
1556     if (!c->authorized)
1557         c->auth_timeout_event = pa_core_rttime_new(p->core, pa_rtclock_now() + AUTH_TIMEOUT, auth_timeout, c);
1558     else
1559         c->auth_timeout_event = NULL;
1560
1561     c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c);
1562     p->core->mainloop->defer_enable(c->defer_event, 0);
1563
1564     pa_idxset_put(p->connections, c, &c->index);
1565 }
1566
1567 void pa_esound_protocol_disconnect(pa_esound_protocol *p, pa_module *m) {
1568     connection *c;
1569     void *state = NULL;
1570
1571     pa_assert(p);
1572     pa_assert(m);
1573
1574     while ((c = pa_idxset_iterate(p->connections, &state, NULL)))
1575         if (c->options->module == m)
1576             connection_unlink(c);
1577 }
1578
1579 static pa_esound_protocol* esound_protocol_new(pa_core *c) {
1580     pa_esound_protocol *p;
1581
1582     pa_assert(c);
1583
1584     p = pa_xnew(pa_esound_protocol, 1);
1585     PA_REFCNT_INIT(p);
1586     p->core = c;
1587     p->connections = pa_idxset_new(NULL, NULL);
1588     p->n_player = 0;
1589
1590     pa_assert_se(pa_shared_set(c, "esound-protocol", p) >= 0);
1591
1592     return p;
1593 }
1594
1595 pa_esound_protocol* pa_esound_protocol_get(pa_core *c) {
1596     pa_esound_protocol *p;
1597
1598     if ((p = pa_shared_get(c, "esound-protocol")))
1599         return pa_esound_protocol_ref(p);
1600
1601     return esound_protocol_new(c);
1602 }
1603
1604 pa_esound_protocol* pa_esound_protocol_ref(pa_esound_protocol *p) {
1605     pa_assert(p);
1606     pa_assert(PA_REFCNT_VALUE(p) >= 1);
1607
1608     PA_REFCNT_INC(p);
1609
1610     return p;
1611 }
1612
1613 void pa_esound_protocol_unref(pa_esound_protocol *p) {
1614     connection *c;
1615     pa_assert(p);
1616     pa_assert(PA_REFCNT_VALUE(p) >= 1);
1617
1618     if (PA_REFCNT_DEC(p) > 0)
1619         return;
1620
1621     while ((c = pa_idxset_first(p->connections, NULL)))
1622         connection_unlink(c);
1623
1624     pa_idxset_free(p->connections, NULL, NULL);
1625
1626     pa_assert_se(pa_shared_remove(p->core, "esound-protocol") >= 0);
1627
1628     pa_xfree(p);
1629 }
1630
1631 pa_esound_options* pa_esound_options_new(void) {
1632     pa_esound_options *o;
1633
1634     o = pa_xnew0(pa_esound_options, 1);
1635     PA_REFCNT_INIT(o);
1636
1637     return o;
1638 }
1639
1640 pa_esound_options* pa_esound_options_ref(pa_esound_options *o) {
1641     pa_assert(o);
1642     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1643
1644     PA_REFCNT_INC(o);
1645
1646     return o;
1647 }
1648
1649 void pa_esound_options_unref(pa_esound_options *o) {
1650     pa_assert(o);
1651     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1652
1653     if (PA_REFCNT_DEC(o) > 0)
1654         return;
1655
1656     if (o->auth_ip_acl)
1657         pa_ip_acl_free(o->auth_ip_acl);
1658
1659     if (o->auth_cookie)
1660         pa_auth_cookie_unref(o->auth_cookie);
1661
1662     pa_xfree(o->default_sink);
1663     pa_xfree(o->default_source);
1664
1665     pa_xfree(o);
1666 }
1667
1668 int pa_esound_options_parse(pa_esound_options *o, pa_core *c, pa_modargs *ma) {
1669     pa_bool_t enabled;
1670     const char *acl;
1671
1672     pa_assert(o);
1673     pa_assert(PA_REFCNT_VALUE(o) >= 1);
1674     pa_assert(ma);
1675
1676     if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &o->auth_anonymous) < 0) {
1677         pa_log("auth-anonymous= expects a boolean argument.");
1678         return -1;
1679     }
1680
1681     if ((acl = pa_modargs_get_value(ma, "auth-ip-acl", NULL))) {
1682         pa_ip_acl *ipa;
1683
1684         if (!(ipa = pa_ip_acl_new(acl))) {
1685             pa_log("Failed to parse IP ACL '%s'", acl);
1686             return -1;
1687         }
1688
1689         if (o->auth_ip_acl)
1690             pa_ip_acl_free(o->auth_ip_acl);
1691
1692         o->auth_ip_acl = ipa;
1693     }
1694
1695     enabled = TRUE;
1696     if (pa_modargs_get_value_boolean(ma, "auth-cookie-enabled", &enabled) < 0) {
1697         pa_log("auth-cookie-enabled= expects a boolean argument.");
1698         return -1;
1699     }
1700
1701     if (o->auth_cookie)
1702         pa_auth_cookie_unref(o->auth_cookie);
1703
1704     if (enabled) {
1705         const char *cn;
1706
1707         /* The new name for this is 'auth-cookie', for compat reasons
1708          * we check the old name too */
1709         if (!(cn = pa_modargs_get_value(ma, "auth-cookie", NULL)))
1710             if (!(cn = pa_modargs_get_value(ma, "cookie", NULL)))
1711                 cn = DEFAULT_COOKIE_FILE;
1712
1713         if (!(o->auth_cookie = pa_auth_cookie_get(c, cn, ESD_KEY_LEN)))
1714             return -1;
1715
1716     } else
1717         o->auth_cookie = NULL;
1718
1719     pa_xfree(o->default_sink);
1720     o->default_sink = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
1721
1722     pa_xfree(o->default_source);
1723     o->default_source = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
1724
1725     return 0;
1726 }