split polypcore/util.[ch] into polypcore/core-util.[ch] and polyp/util.[ch]
[profile/ivi/pulseaudio.git] / src / polypcore / protocol-simple.c
1 /* $Id$ */
2
3 /***
4   This file is part of polypaudio.
5  
6   polypaudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as published
8   by the Free Software Foundation; either version 2 of the License,
9   or (at your option) any later version.
10  
11   polypaudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15  
16   You should have received a copy of the GNU Lesser General Public License
17   along with polypaudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <assert.h>
27 #include <stdlib.h>
28 #include <limits.h>
29 #include <stdio.h>
30 #include <errno.h>
31 #include <string.h>
32
33 #include <polyp/xmalloc.h>
34
35 #include <polypcore/sink-input.h>
36 #include <polypcore/source-output.h>
37 #include <polypcore/client.h>
38 #include <polypcore/sample-util.h>
39 #include <polypcore/namereg.h>
40 #include <polypcore/log.h>
41
42 #include "protocol-simple.h"
43
44 /* Don't allow more than this many concurrent connections */
45 #define MAX_CONNECTIONS 10
46
47 struct connection {
48     pa_protocol_simple *protocol;
49     pa_iochannel *io;
50     pa_sink_input *sink_input;
51     pa_source_output *source_output;
52     pa_client *client;
53     pa_memblockq *input_memblockq, *output_memblockq;
54     pa_defer_event *defer_event;
55
56     int dead;
57     
58     struct {
59         pa_memblock *current_memblock;
60         size_t memblock_index, fragment_size;
61     } playback;
62 };
63
64 struct pa_protocol_simple {
65     pa_module *module;
66     pa_core *core;
67     pa_socket_server*server;
68     pa_idxset *connections;
69     enum {
70         RECORD = 1,
71         PLAYBACK = 2,
72         DUPLEX = 3
73     } mode;
74     pa_sample_spec sample_spec;
75     char *source_name, *sink_name;
76 };
77
78 #define PLAYBACK_BUFFER_SECONDS (.5)
79 #define PLAYBACK_BUFFER_FRAGMENTS (10)
80 #define RECORD_BUFFER_SECONDS (5)
81 #define RECORD_BUFFER_FRAGMENTS (100)
82
83 static void connection_free(struct connection *c) {
84     assert(c);
85
86     pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
87
88     if (c->playback.current_memblock)
89         pa_memblock_unref(c->playback.current_memblock);
90     if (c->sink_input) {
91         pa_sink_input_disconnect(c->sink_input);
92         pa_sink_input_unref(c->sink_input);
93     }
94     if (c->source_output) {
95         pa_source_output_disconnect(c->source_output);
96         pa_source_output_unref(c->source_output);
97     }
98     if (c->client)
99         pa_client_free(c->client);
100     if (c->io)
101         pa_iochannel_free(c->io);
102     if (c->input_memblockq)
103         pa_memblockq_free(c->input_memblockq);
104     if (c->output_memblockq)
105         pa_memblockq_free(c->output_memblockq);
106     if (c->defer_event)
107         c->protocol->core->mainloop->defer_free(c->defer_event);
108     pa_xfree(c);
109 }
110
111 static int do_read(struct connection *c) {
112     pa_memchunk chunk;
113     ssize_t r;
114     size_t l;
115
116     if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq)))
117         return 0;
118
119     if (l > c->playback.fragment_size)
120         l = c->playback.fragment_size;
121
122     if (c->playback.current_memblock) 
123         if (c->playback.current_memblock->length - c->playback.memblock_index < l) {
124             pa_memblock_unref(c->playback.current_memblock);
125             c->playback.current_memblock = NULL;
126             c->playback.memblock_index = 0;
127         }
128
129     if (!c->playback.current_memblock) {
130         c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat);
131         assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
132         c->playback.memblock_index = 0;
133     }
134     
135     if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) {
136         pa_log_debug(__FILE__": read() failed: %s", r == 0 ? "EOF" : strerror(errno));
137         return -1;
138     }
139
140     chunk.memblock = c->playback.current_memblock;
141     chunk.index = c->playback.memblock_index;
142     chunk.length = r;
143     assert(chunk.memblock);
144
145     c->playback.memblock_index += r;
146     
147     assert(c->input_memblockq);
148     pa_memblockq_push_align(c->input_memblockq, &chunk);
149     assert(c->sink_input);
150     pa_sink_notify(c->sink_input->sink);
151     
152     return 0;
153 }
154
155 static int do_write(struct connection *c) {
156     pa_memchunk chunk;
157     ssize_t r;
158
159     if (!c->source_output)
160         return 0;    
161
162     assert(c->output_memblockq);
163     if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)
164         return 0;
165     
166     assert(chunk.memblock && chunk.length);
167     
168     if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) {
169         pa_memblock_unref(chunk.memblock);
170         pa_log(__FILE__": write(): %s", strerror(errno));
171         return -1;
172     }
173     
174     pa_memblockq_drop(c->output_memblockq, &chunk, r);
175     pa_memblock_unref(chunk.memblock);
176
177     pa_source_notify(c->source_output->source);
178     
179     return 0;
180 }
181
182 static void do_work(struct connection *c) {
183     assert(c);
184
185     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
186     c->protocol->core->mainloop->defer_enable(c->defer_event, 0);
187
188     if (c->dead)
189         return;
190     
191     if (pa_iochannel_is_readable(c->io)) {
192         if (do_read(c) < 0)
193             goto fail;
194     } else if (pa_iochannel_is_hungup(c->io))
195         goto fail;
196
197     if (pa_iochannel_is_writable(c->io)) {
198         if (do_write(c) < 0)
199             goto fail;
200     } 
201
202     return;
203
204 fail:
205
206     if (c->sink_input) {
207         c->dead = 1;
208         
209         pa_iochannel_free(c->io);
210         c->io = NULL;
211
212         pa_memblockq_prebuf_disable(c->input_memblockq);
213         pa_sink_notify(c->sink_input->sink);
214     } else
215         connection_free(c);
216 }
217
218 /*** sink_input callbacks ***/
219
220 static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
221     struct connection*c;
222     assert(i && i->userdata && chunk);
223     c = i->userdata;
224     
225     if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) {
226         
227         if (c->dead)
228             connection_free(c);
229         
230         return -1;
231     }
232
233     return 0;
234 }
235
236 static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) {
237     struct connection*c = i->userdata;
238     assert(i && c && length);
239
240     pa_memblockq_drop(c->input_memblockq, chunk, length);
241
242     /* do something */
243     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
244     c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
245 }
246
247 static void sink_input_kill_cb(pa_sink_input *i) {
248     assert(i && i->userdata);
249     connection_free((struct connection *) i->userdata);
250 }
251
252
253 static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i) {
254     struct connection*c = i->userdata;
255     assert(i && c);
256     return pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
257 }
258
259 /*** source_output callbacks ***/
260
261 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
262     struct connection *c = o->userdata;
263     assert(o && c && chunk);
264
265     pa_memblockq_push(c->output_memblockq, chunk);
266
267     /* do something */
268     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
269     c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
270 }
271
272 static void source_output_kill_cb(pa_source_output *o) {
273     assert(o && o->userdata);
274     connection_free((struct connection *) o->userdata);
275 }
276
277 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
278     struct connection*c = o->userdata;
279     assert(o && c);
280     return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
281 }
282
283 /*** client callbacks ***/
284
285 static void client_kill_cb(pa_client *c) {
286     assert(c && c->userdata);
287     connection_free((struct connection *) c->userdata);
288 }
289
290 /*** pa_iochannel callbacks ***/
291
292 static void io_callback(pa_iochannel*io, void *userdata) {
293     struct connection *c = userdata;
294     assert(io && c && c->io == io);
295
296     do_work(c);
297 }
298
299 /*** fixed callback ***/
300
301 static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *userdata) {
302     struct connection *c = userdata;
303     assert(a && c && c->defer_event == e);
304
305     do_work(c);
306 }
307
308 /*** socket_server callbacks ***/
309
310 static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) {
311     pa_protocol_simple *p = userdata;
312     struct connection *c = NULL;
313     char cname[256];
314     assert(s && io && p);
315
316     if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
317         pa_log(__FILE__": Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
318         pa_iochannel_free(io);
319         return;
320     }
321
322     c = pa_xmalloc(sizeof(struct connection));
323     c->io = io;
324     c->sink_input = NULL;
325     c->source_output = NULL;
326     c->defer_event = NULL;
327     c->input_memblockq = c->output_memblockq = NULL;
328     c->protocol = p;
329     c->playback.current_memblock = NULL;
330     c->playback.memblock_index = 0;
331     c->playback.fragment_size = 0;
332     c->dead = 0;
333     
334     pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname));
335     c->client = pa_client_new(p->core, __FILE__, cname);
336     assert(c->client);
337     c->client->owner = p->module;
338     c->client->kill = client_kill_cb;
339     c->client->userdata = c;
340
341     if (p->mode & PLAYBACK) {
342         pa_sink *sink;
343         size_t l;
344
345         if (!(sink = pa_namereg_get(p->core, p->sink_name, PA_NAMEREG_SINK, 1))) {
346             pa_log(__FILE__": Failed to get sink.");
347             goto fail;
348         }
349
350         if (!(c->sink_input = pa_sink_input_new(sink, __FILE__, c->client->name, &p->sample_spec, NULL, NULL, 0, -1))) {
351             pa_log(__FILE__": Failed to create sink input.");
352             goto fail;
353         }
354         
355         c->sink_input->owner = p->module;
356         c->sink_input->client = c->client;
357         
358         c->sink_input->peek = sink_input_peek_cb;
359         c->sink_input->drop = sink_input_drop_cb;
360         c->sink_input->kill = sink_input_kill_cb;
361         c->sink_input->get_latency = sink_input_get_latency_cb;
362         c->sink_input->userdata = c;
363
364         l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
365         c->input_memblockq = pa_memblockq_new(
366                 0,
367                 l,
368                 0,
369                 pa_frame_size(&p->sample_spec),
370                 (size_t) -1,
371                 l/PLAYBACK_BUFFER_FRAGMENTS,
372                 NULL,
373                 p->core->memblock_stat);
374         assert(c->input_memblockq);
375         pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
376         c->playback.fragment_size = l/10;
377     }
378
379     if (p->mode & RECORD) {
380         pa_source *source;
381         size_t l;
382
383         if (!(source = pa_namereg_get(p->core, p->source_name, PA_NAMEREG_SOURCE, 1))) {
384             pa_log(__FILE__": Failed to get source.");
385             goto fail;
386         }
387
388         c->source_output = pa_source_output_new(source, __FILE__, c->client->name, &p->sample_spec, NULL, -1);
389         if (!c->source_output) {
390             pa_log(__FILE__": Failed to create source output.");
391             goto fail;
392         }
393         c->source_output->owner = p->module;
394         c->source_output->client = c->client;
395         
396         c->source_output->push = source_output_push_cb;
397         c->source_output->kill = source_output_kill_cb;
398         c->source_output->get_latency = source_output_get_latency_cb;
399         c->source_output->userdata = c;
400
401         l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS);
402         c->output_memblockq = pa_memblockq_new(
403                 0,
404                 l,
405                 0,
406                 pa_frame_size(&p->sample_spec),
407                 1,
408                 0,
409                 NULL,
410                 p->core->memblock_stat);
411         pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
412     }
413
414     pa_iochannel_set_callback(c->io, io_callback, c);
415     pa_idxset_put(p->connections, c, NULL);
416
417     c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c);
418     assert(c->defer_event);
419     p->core->mainloop->defer_enable(c->defer_event, 0);
420     
421     return;
422     
423 fail:
424     if (c)
425         connection_free(c);
426 }
427
428 pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) {
429     pa_protocol_simple* p = NULL;
430     int enable;
431     assert(core && server && ma);
432
433     p = pa_xmalloc0(sizeof(pa_protocol_simple));
434     p->module = m;
435     p->core = core;
436     p->server = server;
437     p->connections = pa_idxset_new(NULL, NULL);
438
439     p->sample_spec = core->default_sample_spec;
440     if (pa_modargs_get_sample_spec(ma, &p->sample_spec) < 0) {
441         pa_log(__FILE__": Failed to parse sample type specification.");
442         goto fail;
443     }
444
445     p->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
446     p->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
447     
448     enable = 0;
449     if (pa_modargs_get_value_boolean(ma, "record", &enable) < 0) {
450         pa_log(__FILE__": record= expects a numeric argument.");
451         goto fail;
452     }
453     p->mode = enable ? RECORD : 0;
454
455     enable = 1;
456     if (pa_modargs_get_value_boolean(ma, "playback", &enable) < 0) {
457         pa_log(__FILE__": playback= expects a numeric argument.");
458         goto fail;
459     }
460     p->mode |= enable ? PLAYBACK : 0;
461
462     if ((p->mode & (RECORD|PLAYBACK)) == 0) {
463         pa_log(__FILE__": neither playback nor recording enabled for protocol.");
464         goto fail;
465     }
466     
467     pa_socket_server_set_callback(p->server, on_connection, p);
468     
469     return p;
470
471 fail:
472     if (p)
473         pa_protocol_simple_free(p);
474     return NULL;
475 }
476
477
478 void pa_protocol_simple_free(pa_protocol_simple *p) {
479     struct connection *c;
480     assert(p);
481
482     if (p->connections) {
483         while((c = pa_idxset_first(p->connections, NULL)))
484             connection_free(c);
485         
486         pa_idxset_free(p->connections, NULL, NULL);
487     }
488
489     if (p->server)
490         pa_socket_server_unref(p->server);
491     pa_xfree(p);
492 }
493