Revert r1404 and keep it on a development branch until it is fully tested.
[profile/ivi/pulseaudio-panda.git] / src / pulsecore / protocol-simple.c
1 /* $Id$ */
2
3 /***
4   This file is part of PulseAudio.
5  
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as published
8   by the Free Software Foundation; either version 2 of the License,
9   or (at your option) any later version.
10  
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15  
16   You should have received a copy of the GNU Lesser General Public License
17   along with PulseAudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <assert.h>
27 #include <stdlib.h>
28 #include <limits.h>
29 #include <stdio.h>
30 #include <errno.h>
31 #include <string.h>
32
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/sink-input.h>
36 #include <pulsecore/source-output.h>
37 #include <pulsecore/client.h>
38 #include <pulsecore/sample-util.h>
39 #include <pulsecore/namereg.h>
40 #include <pulsecore/log.h>
41 #include <pulsecore/core-error.h>
42
43 #include "protocol-simple.h"
44
45 /* Don't allow more than this many concurrent connections */
46 #define MAX_CONNECTIONS 10
47
48 struct connection {
49     pa_protocol_simple *protocol;
50     pa_iochannel *io;
51     pa_sink_input *sink_input;
52     pa_source_output *source_output;
53     pa_client *client;
54     pa_memblockq *input_memblockq, *output_memblockq;
55     pa_defer_event *defer_event;
56
57     int dead;
58     
59     struct {
60         pa_memblock *current_memblock;
61         size_t memblock_index, fragment_size;
62     } playback;
63 };
64
65 struct pa_protocol_simple {
66     pa_module *module;
67     pa_core *core;
68     pa_socket_server*server;
69     pa_idxset *connections;
70     enum {
71         RECORD = 1,
72         PLAYBACK = 2,
73         DUPLEX = 3
74     } mode;
75     pa_sample_spec sample_spec;
76     char *source_name, *sink_name;
77 };
78
79 #define PLAYBACK_BUFFER_SECONDS (.5)
80 #define PLAYBACK_BUFFER_FRAGMENTS (10)
81 #define RECORD_BUFFER_SECONDS (5)
82 #define RECORD_BUFFER_FRAGMENTS (100)
83
84 static void connection_free(struct connection *c) {
85     assert(c);
86
87     pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
88
89     if (c->playback.current_memblock)
90         pa_memblock_unref(c->playback.current_memblock);
91     if (c->sink_input) {
92         pa_sink_input_disconnect(c->sink_input);
93         pa_sink_input_unref(c->sink_input);
94     }
95     if (c->source_output) {
96         pa_source_output_disconnect(c->source_output);
97         pa_source_output_unref(c->source_output);
98     }
99     if (c->client)
100         pa_client_free(c->client);
101     if (c->io)
102         pa_iochannel_free(c->io);
103     if (c->input_memblockq)
104         pa_memblockq_free(c->input_memblockq);
105     if (c->output_memblockq)
106         pa_memblockq_free(c->output_memblockq);
107     if (c->defer_event)
108         c->protocol->core->mainloop->defer_free(c->defer_event);
109     pa_xfree(c);
110 }
111
112 static int do_read(struct connection *c) {
113     pa_memchunk chunk;
114     ssize_t r;
115     size_t l;
116
117     if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq)))
118         return 0;
119
120     if (l > c->playback.fragment_size)
121         l = c->playback.fragment_size;
122
123     if (c->playback.current_memblock) 
124         if (c->playback.current_memblock->length - c->playback.memblock_index < l) {
125             pa_memblock_unref(c->playback.current_memblock);
126             c->playback.current_memblock = NULL;
127             c->playback.memblock_index = 0;
128         }
129
130     if (!c->playback.current_memblock) {
131         c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2);
132         assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
133         c->playback.memblock_index = 0;
134     }
135     
136     if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) {
137         pa_log_debug("read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno));
138         return -1;
139     }
140
141     chunk.memblock = c->playback.current_memblock;
142     chunk.index = c->playback.memblock_index;
143     chunk.length = r;
144     assert(chunk.memblock);
145
146     c->playback.memblock_index += r;
147     
148     assert(c->input_memblockq);
149     pa_memblockq_push_align(c->input_memblockq, &chunk);
150     assert(c->sink_input);
151     pa_sink_notify(c->sink_input->sink);
152     
153     return 0;
154 }
155
156 static int do_write(struct connection *c) {
157     pa_memchunk chunk;
158     ssize_t r;
159
160     if (!c->source_output)
161         return 0;    
162
163     assert(c->output_memblockq);
164     if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)
165         return 0;
166     
167     assert(chunk.memblock && chunk.length);
168     
169     if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) {
170         pa_memblock_unref(chunk.memblock);
171         pa_log("write(): %s", pa_cstrerror(errno));
172         return -1;
173     }
174     
175     pa_memblockq_drop(c->output_memblockq, &chunk, r);
176     pa_memblock_unref(chunk.memblock);
177
178     pa_source_notify(c->source_output->source);
179     
180     return 0;
181 }
182
183 static void do_work(struct connection *c) {
184     assert(c);
185
186     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
187     c->protocol->core->mainloop->defer_enable(c->defer_event, 0);
188
189     if (c->dead)
190         return;
191     
192     if (pa_iochannel_is_readable(c->io)) {
193         if (do_read(c) < 0)
194             goto fail;
195     } else if (pa_iochannel_is_hungup(c->io))
196         goto fail;
197
198     if (pa_iochannel_is_writable(c->io)) {
199         if (do_write(c) < 0)
200             goto fail;
201     } 
202
203     return;
204
205 fail:
206
207     if (c->sink_input) {
208         c->dead = 1;
209         
210         pa_iochannel_free(c->io);
211         c->io = NULL;
212
213         pa_memblockq_prebuf_disable(c->input_memblockq);
214         pa_sink_notify(c->sink_input->sink);
215     } else
216         connection_free(c);
217 }
218
219 /*** sink_input callbacks ***/
220
221 static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
222     struct connection*c;
223     assert(i && i->userdata && chunk);
224     c = i->userdata;
225     
226     if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) {
227         
228         if (c->dead)
229             connection_free(c);
230         
231         return -1;
232     }
233
234     return 0;
235 }
236
237 static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) {
238     struct connection*c = i->userdata;
239     assert(i && c && length);
240
241     pa_memblockq_drop(c->input_memblockq, chunk, length);
242
243     /* do something */
244     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
245     c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
246 }
247
248 static void sink_input_kill_cb(pa_sink_input *i) {
249     assert(i && i->userdata);
250     connection_free((struct connection *) i->userdata);
251 }
252
253
254 static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i) {
255     struct connection*c = i->userdata;
256     assert(i && c);
257     return pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
258 }
259
260 /*** source_output callbacks ***/
261
262 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
263     struct connection *c = o->userdata;
264     assert(o && c && chunk);
265
266     pa_memblockq_push(c->output_memblockq, chunk);
267
268     /* do something */
269     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
270     c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
271 }
272
273 static void source_output_kill_cb(pa_source_output *o) {
274     assert(o && o->userdata);
275     connection_free((struct connection *) o->userdata);
276 }
277
278 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
279     struct connection*c = o->userdata;
280     assert(o && c);
281     return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
282 }
283
284 /*** client callbacks ***/
285
286 static void client_kill_cb(pa_client *c) {
287     assert(c && c->userdata);
288     connection_free((struct connection *) c->userdata);
289 }
290
291 /*** pa_iochannel callbacks ***/
292
293 static void io_callback(pa_iochannel*io, void *userdata) {
294     struct connection *c = userdata;
295     assert(io && c && c->io == io);
296
297     do_work(c);
298 }
299
300 /*** fixed callback ***/
301
302 static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *userdata) {
303     struct connection *c = userdata;
304     assert(a && c && c->defer_event == e);
305
306     do_work(c);
307 }
308
309 /*** socket_server callbacks ***/
310
311 static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) {
312     pa_protocol_simple *p = userdata;
313     struct connection *c = NULL;
314     char cname[256];
315     assert(s && io && p);
316
317     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
318         pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
319         pa_iochannel_free(io);
320         return;
321     }
322
323     c = pa_xmalloc(sizeof(struct connection));
324     c->io = io;
325     c->sink_input = NULL;
326     c->source_output = NULL;
327     c->defer_event = NULL;
328     c->input_memblockq = c->output_memblockq = NULL;
329     c->protocol = p;
330     c->playback.current_memblock = NULL;
331     c->playback.memblock_index = 0;
332     c->playback.fragment_size = 0;
333     c->dead = 0;
334     
335     pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname));
336     c->client = pa_client_new(p->core, __FILE__, cname);
337     assert(c->client);
338     c->client->owner = p->module;
339     c->client->kill = client_kill_cb;
340     c->client->userdata = c;
341
342     if (p->mode & PLAYBACK) {
343         pa_sink_input_new_data data;
344         size_t l;
345
346         pa_sink_input_new_data_init(&data);
347         data.driver = __FILE__;
348         data.name = c->client->name;
349         pa_sink_input_new_data_set_sample_spec(&data, &p->sample_spec);
350         data.module = p->module;
351         data.client = c->client;
352
353         if (!(c->sink_input = pa_sink_input_new(p->core, &data, 0))) {
354             pa_log("Failed to create sink input.");
355             goto fail;
356         }
357         
358         c->sink_input->peek = sink_input_peek_cb;
359         c->sink_input->drop = sink_input_drop_cb;
360         c->sink_input->kill = sink_input_kill_cb;
361         c->sink_input->get_latency = sink_input_get_latency_cb;
362         c->sink_input->userdata = c;
363
364         l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
365         c->input_memblockq = pa_memblockq_new(
366                 0,
367                 l,
368                 0,
369                 pa_frame_size(&p->sample_spec),
370                 (size_t) -1,
371                 l/PLAYBACK_BUFFER_FRAGMENTS,
372                 NULL);
373         assert(c->input_memblockq);
374         pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
375         c->playback.fragment_size = l/10;
376
377         pa_sink_notify(c->sink_input->sink);
378     }
379
380     if (p->mode & RECORD) {
381         pa_source_output_new_data data;
382         size_t l;
383
384         pa_source_output_new_data_init(&data);
385         data.driver = __FILE__;
386         data.name = c->client->name;
387         pa_source_output_new_data_set_sample_spec(&data, &p->sample_spec);
388         data.module = p->module;
389         data.client = c->client;
390
391         if (!(c->source_output = pa_source_output_new(p->core, &data, 0))) {
392             pa_log("Failed to create source output.");
393             goto fail;
394         }
395         c->source_output->push = source_output_push_cb;
396         c->source_output->kill = source_output_kill_cb;
397         c->source_output->get_latency = source_output_get_latency_cb;
398         c->source_output->userdata = c;
399
400         l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS);
401         c->output_memblockq = pa_memblockq_new(
402                 0,
403                 l,
404                 0,
405                 pa_frame_size(&p->sample_spec),
406                 1,
407                 0,
408                 NULL);
409         pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
410         pa_source_notify(c->source_output->source);
411     }
412
413     pa_iochannel_set_callback(c->io, io_callback, c);
414     pa_idxset_put(p->connections, c, NULL);
415
416     c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c);
417     assert(c->defer_event);
418     p->core->mainloop->defer_enable(c->defer_event, 0);
419     
420     return;
421     
422 fail:
423     if (c)
424         connection_free(c);
425 }
426
427 pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) {
428     pa_protocol_simple* p = NULL;
429     int enable;
430     assert(core && server && ma);
431
432     p = pa_xmalloc0(sizeof(pa_protocol_simple));
433     p->module = m;
434     p->core = core;
435     p->server = server;
436     p->connections = pa_idxset_new(NULL, NULL);
437
438     p->sample_spec = core->default_sample_spec;
439     if (pa_modargs_get_sample_spec(ma, &p->sample_spec) < 0) {
440         pa_log("Failed to parse sample type specification.");
441         goto fail;
442     }
443
444     p->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
445     p->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
446     
447     enable = 0;
448     if (pa_modargs_get_value_boolean(ma, "record", &enable) < 0) {
449         pa_log("record= expects a numeric argument.");
450         goto fail;
451     }
452     p->mode = enable ? RECORD : 0;
453
454     enable = 1;
455     if (pa_modargs_get_value_boolean(ma, "playback", &enable) < 0) {
456         pa_log("playback= expects a numeric argument.");
457         goto fail;
458     }
459     p->mode |= enable ? PLAYBACK : 0;
460
461     if ((p->mode & (RECORD|PLAYBACK)) == 0) {
462         pa_log("neither playback nor recording enabled for protocol.");
463         goto fail;
464     }
465     
466     pa_socket_server_set_callback(p->server, on_connection, p);
467     
468     return p;
469
470 fail:
471     if (p)
472         pa_protocol_simple_free(p);
473     return NULL;
474 }
475
476
477 void pa_protocol_simple_free(pa_protocol_simple *p) {
478     struct connection *c;
479     assert(p);
480
481     if (p->connections) {
482         while((c = pa_idxset_first(p->connections, NULL)))
483             connection_free(c);
484         
485         pa_idxset_free(p->connections, NULL, NULL);
486     }
487
488     if (p->server)
489         pa_socket_server_unref(p->server);
490     pa_xfree(p);
491 }
492