limit the number of concurrent connections for all four protocols
[profile/ivi/pulseaudio-panda.git] / polyp / protocol-simple.c
1 /* $Id$ */
2
3 /***
4   This file is part of polypaudio.
5  
6   polypaudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as published
8   by the Free Software Foundation; either version 2 of the License,
9   or (at your option) any later version.
10  
11   polypaudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15  
16   You should have received a copy of the GNU Lesser General Public License
17   along with polypaudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <assert.h>
27 #include <stdlib.h>
28 #include <limits.h>
29 #include <stdio.h>
30 #include <errno.h>
31 #include <string.h>
32
33 #include "sink-input.h"
34 #include "source-output.h"
35 #include "protocol-simple.h"
36 #include "client.h"
37 #include "sample-util.h"
38 #include "namereg.h"
39 #include "xmalloc.h"
40 #include "log.h"
41
42 /* Don't allow more than this many concurrent connections */
43 #define MAX_CONNECTIONS 10
44
45 struct connection {
46     struct pa_protocol_simple *protocol;
47     struct pa_iochannel *io;
48     struct pa_sink_input *sink_input;
49     struct pa_source_output *source_output;
50     struct pa_client *client;
51     struct pa_memblockq *input_memblockq, *output_memblockq;
52     struct pa_defer_event *defer_event;
53
54     struct {
55         struct pa_memblock *current_memblock;
56         size_t memblock_index, fragment_size;
57     } playback;
58 };
59
60 struct pa_protocol_simple {
61     struct pa_module *module;
62     struct pa_core *core;
63     struct pa_socket_server*server;
64     struct pa_idxset *connections;
65     enum {
66         RECORD = 1,
67         PLAYBACK = 2,
68         DUPLEX = 3
69     } mode;
70     struct pa_sample_spec sample_spec;
71     char *source_name, *sink_name;
72 };
73
74 #define PLAYBACK_BUFFER_SECONDS (.5)
75 #define PLAYBACK_BUFFER_FRAGMENTS (10)
76 #define RECORD_BUFFER_SECONDS (5)
77 #define RECORD_BUFFER_FRAGMENTS (100)
78
79 static void connection_free(struct connection *c) {
80     assert(c);
81
82     pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
83
84     if (c->playback.current_memblock)
85         pa_memblock_unref(c->playback.current_memblock);
86     if (c->sink_input) {
87         pa_sink_input_disconnect(c->sink_input);
88         pa_sink_input_unref(c->sink_input);
89     }
90     if (c->source_output) {
91         pa_source_output_disconnect(c->source_output);
92         pa_source_output_unref(c->source_output);
93     }
94     if (c->client)
95         pa_client_free(c->client);
96     if (c->io)
97         pa_iochannel_free(c->io);
98     if (c->input_memblockq)
99         pa_memblockq_free(c->input_memblockq);
100     if (c->output_memblockq)
101         pa_memblockq_free(c->output_memblockq);
102     if (c->defer_event)
103         c->protocol->core->mainloop->defer_free(c->defer_event);
104     pa_xfree(c);
105 }
106
107 static int do_read(struct connection *c) {
108     struct pa_memchunk chunk;
109     ssize_t r;
110     size_t l;
111
112     if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq)))
113         return 0;
114
115     if (l > c->playback.fragment_size)
116         l = c->playback.fragment_size;
117
118     if (c->playback.current_memblock) 
119         if (c->playback.current_memblock->length - c->playback.memblock_index < l) {
120             pa_memblock_unref(c->playback.current_memblock);
121             c->playback.current_memblock = NULL;
122             c->playback.memblock_index = 0;
123         }
124
125     if (!c->playback.current_memblock) {
126         c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat);
127         assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
128         c->playback.memblock_index = 0;
129     }
130     
131     if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) {
132         pa_log(__FILE__": read() failed: %s\n", r == 0 ? "EOF" : strerror(errno));
133         return -1;
134     }
135
136     chunk.memblock = c->playback.current_memblock;
137     chunk.index = c->playback.memblock_index;
138     chunk.length = r;
139     assert(chunk.memblock);
140
141     c->playback.memblock_index += r;
142     
143     assert(c->input_memblockq);
144     pa_memblockq_push_align(c->input_memblockq, &chunk, 0);
145     assert(c->sink_input);
146     pa_sink_notify(c->sink_input->sink);
147     
148     return 0;
149 }
150
151 static int do_write(struct connection *c) {
152     struct pa_memchunk chunk;
153     ssize_t r;
154
155     if (!c->source_output)
156         return 0;    
157
158     assert(c->output_memblockq);
159     if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)
160         return 0;
161     
162     assert(chunk.memblock && chunk.length);
163     
164     if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) {
165         pa_memblock_unref(chunk.memblock);
166         pa_log(__FILE__": write(): %s\n", strerror(errno));
167         return -1;
168     }
169     
170     pa_memblockq_drop(c->output_memblockq, &chunk, r);
171     pa_memblock_unref(chunk.memblock);
172     
173     return 0;
174 }
175
176
177 static void do_work(struct connection *c) {
178     assert(c);
179
180     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
181     c->protocol->core->mainloop->defer_enable(c->defer_event, 0);
182
183     if (pa_iochannel_is_hungup(c->io))
184         goto fail;
185     
186     if (pa_iochannel_is_writable(c->io))
187         if (do_write(c) < 0)
188             goto fail;
189     
190     if (pa_iochannel_is_readable(c->io))
191         if (do_read(c) < 0)
192             goto fail;
193
194     return;
195
196 fail:
197     connection_free(c);
198 }
199
200 /*** sink_input callbacks ***/
201
202 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk) {
203     struct connection*c;
204     assert(i && i->userdata && chunk);
205     c = i->userdata;
206     
207     if (pa_memblockq_peek(c->input_memblockq, chunk) < 0)
208         return -1;
209
210     return 0;
211 }
212
213 static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
214     struct connection*c = i->userdata;
215     assert(i && c && length);
216
217     pa_memblockq_drop(c->input_memblockq, chunk, length);
218
219     /* do something */
220     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
221     c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
222 }
223
224 static void sink_input_kill_cb(struct pa_sink_input *i) {
225     assert(i && i->userdata);
226     connection_free((struct connection *) i->userdata);
227 }
228
229
230 static pa_usec_t sink_input_get_latency_cb(struct pa_sink_input *i) {
231     struct connection*c = i->userdata;
232     assert(i && c);
233     return pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
234 }
235
236 /*** source_output callbacks ***/
237
238 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk) {
239     struct connection *c = o->userdata;
240     assert(o && c && chunk);
241
242     pa_memblockq_push(c->output_memblockq, chunk, 0);
243
244     /* do something */
245     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
246     c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
247 }
248
249 static void source_output_kill_cb(struct pa_source_output *o) {
250     assert(o && o->userdata);
251     connection_free((struct connection *) o->userdata);
252 }
253
254 static pa_usec_t source_output_get_latency_cb(struct pa_source_output *o) {
255     struct connection*c = o->userdata;
256     assert(o && c);
257     return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
258 }
259
260 /*** client callbacks ***/
261
262 static void client_kill_cb(struct pa_client *c) {
263     assert(c && c->userdata);
264     connection_free((struct connection *) c->userdata);
265 }
266
267 /*** pa_iochannel callbacks ***/
268
269 static void io_callback(struct pa_iochannel*io, void *userdata) {
270     struct connection *c = userdata;
271     assert(io && c && c->io == io);
272
273     do_work(c);
274 }
275
276 /*** fixed callback ***/
277
278 static void defer_callback(struct pa_mainloop_api*a, struct pa_defer_event *e, void *userdata) {
279     struct connection *c = userdata;
280     assert(a && c && c->defer_event == e);
281
282     do_work(c);
283 }
284
285 /*** socket_server callbacks ***/
286
287 static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, void *userdata) {
288     struct pa_protocol_simple *p = userdata;
289     struct connection *c = NULL;
290     char cname[256];
291     assert(s && io && p);
292
293     if (pa_idxset_ncontents(p->connections)+1 > MAX_CONNECTIONS) {
294         pa_log(__FILE__": Warning! Too many connections (%u), dropping incoming connection.\n", MAX_CONNECTIONS);
295         pa_iochannel_free(io);
296         return;
297     }
298
299     c = pa_xmalloc(sizeof(struct connection));
300     c->io = io;
301     c->sink_input = NULL;
302     c->source_output = NULL;
303     c->defer_event = NULL;
304     c->input_memblockq = c->output_memblockq = NULL;
305     c->protocol = p;
306     c->playback.current_memblock = NULL;
307     c->playback.memblock_index = 0;
308     c->playback.fragment_size = 0;
309     
310     pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname));
311     c->client = pa_client_new(p->core, "SIMPLE", cname);
312     assert(c->client);
313     c->client->owner = p->module;
314     c->client->kill = client_kill_cb;
315     c->client->userdata = c;
316
317     if (p->mode & PLAYBACK) {
318         struct pa_sink *sink;
319         size_t l;
320
321         if (!(sink = pa_namereg_get(p->core, p->sink_name, PA_NAMEREG_SINK, 1))) {
322             pa_log(__FILE__": Failed to get sink.\n");
323             goto fail;
324         }
325
326         if (!(c->sink_input = pa_sink_input_new(sink, c->client->name, &p->sample_spec, 0, -1))) {
327             pa_log(__FILE__": Failed to create sink input.\n");
328             goto fail;
329         }
330         
331         c->sink_input->owner = p->module;
332         c->sink_input->client = c->client;
333         
334         c->sink_input->peek = sink_input_peek_cb;
335         c->sink_input->drop = sink_input_drop_cb;
336         c->sink_input->kill = sink_input_kill_cb;
337         c->sink_input->get_latency = sink_input_get_latency_cb;
338         c->sink_input->userdata = c;
339
340         l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
341         c->input_memblockq = pa_memblockq_new(l, 0, pa_frame_size(&p->sample_spec), l/2, l/PLAYBACK_BUFFER_FRAGMENTS, p->core->memblock_stat);
342         assert(c->input_memblockq);
343         pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
344         c->playback.fragment_size = l/10;
345     }
346
347     if (p->mode & RECORD) {
348         struct pa_source *source;
349         size_t l;
350
351         if (!(source = pa_namereg_get(p->core, p->source_name, PA_NAMEREG_SOURCE, 1))) {
352             pa_log(__FILE__": Failed to get source.\n");
353             goto fail;
354         }
355
356         c->source_output = pa_source_output_new(source, c->client->name, &p->sample_spec, -1);
357         if (!c->source_output) {
358             pa_log(__FILE__": Failed to create source output.\n");
359             goto fail;
360         }
361         c->source_output->owner = p->module;
362         c->source_output->client = c->client;
363         
364         c->source_output->push = source_output_push_cb;
365         c->source_output->kill = source_output_kill_cb;
366         c->source_output->get_latency = source_output_get_latency_cb;
367         c->source_output->userdata = c;
368
369         l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS);
370         c->output_memblockq = pa_memblockq_new(l, 0, pa_frame_size(&p->sample_spec), 0, 0, p->core->memblock_stat);
371         pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
372     }
373
374     pa_iochannel_set_callback(c->io, io_callback, c);
375     pa_idxset_put(p->connections, c, NULL);
376
377     c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c);
378     assert(c->defer_event);
379     p->core->mainloop->defer_enable(c->defer_event, 0);
380     
381     return;
382     
383 fail:
384     if (c)
385         connection_free(c);
386 }
387
388 struct pa_protocol_simple* pa_protocol_simple_new(struct pa_core *core, struct pa_socket_server *server, struct pa_module *m, struct pa_modargs *ma) {
389     struct pa_protocol_simple* p = NULL;
390     int enable;
391     assert(core && server && ma);
392
393     p = pa_xmalloc0(sizeof(struct pa_protocol_simple));
394     p->module = m;
395     p->core = core;
396     p->server = server;
397     p->connections = pa_idxset_new(NULL, NULL);
398
399     p->sample_spec = core->default_sample_spec;
400     if (pa_modargs_get_sample_spec(ma, &p->sample_spec) < 0) {
401         pa_log(__FILE__": Failed to parse sample type specification.\n");
402         goto fail;
403     }
404
405     p->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
406     p->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
407     
408     enable = 0;
409     if (pa_modargs_get_value_boolean(ma, "record", &enable) < 0) {
410         pa_log(__FILE__": record= expects a numeric argument.\n");
411         goto fail;
412     }
413     p->mode = enable ? RECORD : 0;
414
415     enable = 1;
416     if (pa_modargs_get_value_boolean(ma, "playback", &enable) < 0) {
417         pa_log(__FILE__": playback= expects a numeric argument.\n");
418         goto fail;
419     }
420     p->mode |= enable ? PLAYBACK : 0;
421
422     if ((p->mode & (RECORD|PLAYBACK)) == 0) {
423         pa_log(__FILE__": neither playback nor recording enabled for protocol.\n");
424         goto fail;
425     }
426     
427     pa_socket_server_set_callback(p->server, on_connection, p);
428     
429     return p;
430
431 fail:
432     if (p)
433         pa_protocol_simple_free(p);
434     return NULL;
435 }
436
437
438 void pa_protocol_simple_free(struct pa_protocol_simple *p) {
439     struct connection *c;
440     assert(p);
441
442     if (p->connections) {
443         while((c = pa_idxset_first(p->connections, NULL)))
444             connection_free(c);
445         
446         pa_idxset_free(p->connections, NULL, NULL);
447     }
448
449     if (p->server)
450         pa_socket_server_unref(p->server);
451     pa_xfree(p);
452 }
453