4 This file is part of polypaudio.
6 polypaudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2 of the License,
9 or (at your option) any later version.
11 polypaudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License
17 along with polypaudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
33 #include <polyp/xmalloc.h>
35 #include <polypcore/sink-input.h>
36 #include <polypcore/source-output.h>
37 #include <polypcore/client.h>
38 #include <polypcore/sample-util.h>
39 #include <polypcore/namereg.h>
40 #include <polypcore/log.h>
42 #include "protocol-simple.h"
44 /* Don't allow more than this many concurrent connections */
45 #define MAX_CONNECTIONS 10
48 pa_protocol_simple *protocol;
50 pa_sink_input *sink_input;
51 pa_source_output *source_output;
53 pa_memblockq *input_memblockq, *output_memblockq;
54 pa_defer_event *defer_event;
59 pa_memblock *current_memblock;
60 size_t memblock_index, fragment_size;
64 struct pa_protocol_simple {
67 pa_socket_server*server;
68 pa_idxset *connections;
74 pa_sample_spec sample_spec;
75 char *source_name, *sink_name;
78 #define PLAYBACK_BUFFER_SECONDS (.5)
79 #define PLAYBACK_BUFFER_FRAGMENTS (10)
80 #define RECORD_BUFFER_SECONDS (5)
81 #define RECORD_BUFFER_FRAGMENTS (100)
83 static void connection_free(struct connection *c) {
86 pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
88 if (c->playback.current_memblock)
89 pa_memblock_unref(c->playback.current_memblock);
91 pa_sink_input_disconnect(c->sink_input);
92 pa_sink_input_unref(c->sink_input);
94 if (c->source_output) {
95 pa_source_output_disconnect(c->source_output);
96 pa_source_output_unref(c->source_output);
99 pa_client_free(c->client);
101 pa_iochannel_free(c->io);
102 if (c->input_memblockq)
103 pa_memblockq_free(c->input_memblockq);
104 if (c->output_memblockq)
105 pa_memblockq_free(c->output_memblockq);
107 c->protocol->core->mainloop->defer_free(c->defer_event);
111 static int do_read(struct connection *c) {
116 if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq)))
119 if (l > c->playback.fragment_size)
120 l = c->playback.fragment_size;
122 if (c->playback.current_memblock)
123 if (c->playback.current_memblock->length - c->playback.memblock_index < l) {
124 pa_memblock_unref(c->playback.current_memblock);
125 c->playback.current_memblock = NULL;
126 c->playback.memblock_index = 0;
129 if (!c->playback.current_memblock) {
130 c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2, c->protocol->core->memblock_stat);
131 assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
132 c->playback.memblock_index = 0;
135 if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) {
136 pa_log_debug(__FILE__": read() failed: %s", r == 0 ? "EOF" : strerror(errno));
140 chunk.memblock = c->playback.current_memblock;
141 chunk.index = c->playback.memblock_index;
143 assert(chunk.memblock);
145 c->playback.memblock_index += r;
147 assert(c->input_memblockq);
148 pa_memblockq_push_align(c->input_memblockq, &chunk);
149 assert(c->sink_input);
150 pa_sink_notify(c->sink_input->sink);
155 static int do_write(struct connection *c) {
159 if (!c->source_output)
162 assert(c->output_memblockq);
163 if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)
166 assert(chunk.memblock && chunk.length);
168 if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) {
169 pa_memblock_unref(chunk.memblock);
170 pa_log(__FILE__": write(): %s", strerror(errno));
174 pa_memblockq_drop(c->output_memblockq, &chunk, r);
175 pa_memblock_unref(chunk.memblock);
177 pa_source_notify(c->source_output->source);
182 static void do_work(struct connection *c) {
185 assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
186 c->protocol->core->mainloop->defer_enable(c->defer_event, 0);
191 if (pa_iochannel_is_readable(c->io)) {
194 } else if (pa_iochannel_is_hungup(c->io))
197 if (pa_iochannel_is_writable(c->io)) {
209 pa_iochannel_free(c->io);
212 pa_memblockq_prebuf_disable(c->input_memblockq);
213 pa_sink_notify(c->sink_input->sink);
218 /*** sink_input callbacks ***/
220 static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
222 assert(i && i->userdata && chunk);
225 if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) {
236 static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) {
237 struct connection*c = i->userdata;
238 assert(i && c && length);
240 pa_memblockq_drop(c->input_memblockq, chunk, length);
243 assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
244 c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
247 static void sink_input_kill_cb(pa_sink_input *i) {
248 assert(i && i->userdata);
249 connection_free((struct connection *) i->userdata);
253 static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i) {
254 struct connection*c = i->userdata;
256 return pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
259 /*** source_output callbacks ***/
261 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
262 struct connection *c = o->userdata;
263 assert(o && c && chunk);
265 pa_memblockq_push(c->output_memblockq, chunk);
268 assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
269 c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
272 static void source_output_kill_cb(pa_source_output *o) {
273 assert(o && o->userdata);
274 connection_free((struct connection *) o->userdata);
277 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
278 struct connection*c = o->userdata;
280 return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
283 /*** client callbacks ***/
285 static void client_kill_cb(pa_client *c) {
286 assert(c && c->userdata);
287 connection_free((struct connection *) c->userdata);
290 /*** pa_iochannel callbacks ***/
292 static void io_callback(pa_iochannel*io, void *userdata) {
293 struct connection *c = userdata;
294 assert(io && c && c->io == io);
299 /*** fixed callback ***/
301 static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *userdata) {
302 struct connection *c = userdata;
303 assert(a && c && c->defer_event == e);
308 /*** socket_server callbacks ***/
310 static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) {
311 pa_protocol_simple *p = userdata;
312 struct connection *c = NULL;
314 assert(s && io && p);
316 if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
317 pa_log(__FILE__": Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
318 pa_iochannel_free(io);
322 c = pa_xmalloc(sizeof(struct connection));
324 c->sink_input = NULL;
325 c->source_output = NULL;
326 c->defer_event = NULL;
327 c->input_memblockq = c->output_memblockq = NULL;
329 c->playback.current_memblock = NULL;
330 c->playback.memblock_index = 0;
331 c->playback.fragment_size = 0;
334 pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname));
335 c->client = pa_client_new(p->core, __FILE__, cname);
337 c->client->owner = p->module;
338 c->client->kill = client_kill_cb;
339 c->client->userdata = c;
341 if (p->mode & PLAYBACK) {
345 if (!(sink = pa_namereg_get(p->core, p->sink_name, PA_NAMEREG_SINK, 1))) {
346 pa_log(__FILE__": Failed to get sink.");
350 if (!(c->sink_input = pa_sink_input_new(sink, __FILE__, c->client->name, &p->sample_spec, NULL, NULL, 0, -1))) {
351 pa_log(__FILE__": Failed to create sink input.");
355 c->sink_input->owner = p->module;
356 c->sink_input->client = c->client;
358 c->sink_input->peek = sink_input_peek_cb;
359 c->sink_input->drop = sink_input_drop_cb;
360 c->sink_input->kill = sink_input_kill_cb;
361 c->sink_input->get_latency = sink_input_get_latency_cb;
362 c->sink_input->userdata = c;
364 l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
365 c->input_memblockq = pa_memblockq_new(
369 pa_frame_size(&p->sample_spec),
371 l/PLAYBACK_BUFFER_FRAGMENTS,
373 p->core->memblock_stat);
374 assert(c->input_memblockq);
375 pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
376 c->playback.fragment_size = l/10;
379 if (p->mode & RECORD) {
383 if (!(source = pa_namereg_get(p->core, p->source_name, PA_NAMEREG_SOURCE, 1))) {
384 pa_log(__FILE__": Failed to get source.");
388 c->source_output = pa_source_output_new(source, __FILE__, c->client->name, &p->sample_spec, NULL, -1);
389 if (!c->source_output) {
390 pa_log(__FILE__": Failed to create source output.");
393 c->source_output->owner = p->module;
394 c->source_output->client = c->client;
396 c->source_output->push = source_output_push_cb;
397 c->source_output->kill = source_output_kill_cb;
398 c->source_output->get_latency = source_output_get_latency_cb;
399 c->source_output->userdata = c;
401 l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS);
402 c->output_memblockq = pa_memblockq_new(
406 pa_frame_size(&p->sample_spec),
410 p->core->memblock_stat);
411 pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
414 pa_iochannel_set_callback(c->io, io_callback, c);
415 pa_idxset_put(p->connections, c, NULL);
417 c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c);
418 assert(c->defer_event);
419 p->core->mainloop->defer_enable(c->defer_event, 0);
428 pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) {
429 pa_protocol_simple* p = NULL;
431 assert(core && server && ma);
433 p = pa_xmalloc0(sizeof(pa_protocol_simple));
437 p->connections = pa_idxset_new(NULL, NULL);
439 p->sample_spec = core->default_sample_spec;
440 if (pa_modargs_get_sample_spec(ma, &p->sample_spec) < 0) {
441 pa_log(__FILE__": Failed to parse sample type specification.");
445 p->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
446 p->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
449 if (pa_modargs_get_value_boolean(ma, "record", &enable) < 0) {
450 pa_log(__FILE__": record= expects a numeric argument.");
453 p->mode = enable ? RECORD : 0;
456 if (pa_modargs_get_value_boolean(ma, "playback", &enable) < 0) {
457 pa_log(__FILE__": playback= expects a numeric argument.");
460 p->mode |= enable ? PLAYBACK : 0;
462 if ((p->mode & (RECORD|PLAYBACK)) == 0) {
463 pa_log(__FILE__": neither playback nor recording enabled for protocol.");
467 pa_socket_server_set_callback(p->server, on_connection, p);
473 pa_protocol_simple_free(p);
478 void pa_protocol_simple_free(pa_protocol_simple *p) {
479 struct connection *c;
482 if (p->connections) {
483 while((c = pa_idxset_first(p->connections, NULL)))
486 pa_idxset_free(p->connections, NULL, NULL);
490 pa_socket_server_unref(p->server);