4 This file is part of PulseAudio.
6 Copyright 2004-2006 Lennart Poettering
8 PulseAudio is free software; you can redistribute it and/or modify
9 it under the terms of the GNU Lesser General Public License as published
10 by the Free Software Foundation; either version 2 of the License,
11 or (at your option) any later version.
13 PulseAudio is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with PulseAudio; if not, write to the Free Software
20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
34 #include <pulse/xmalloc.h>
36 #include <pulsecore/sink-input.h>
37 #include <pulsecore/source-output.h>
38 #include <pulsecore/client.h>
39 #include <pulsecore/sample-util.h>
40 #include <pulsecore/namereg.h>
41 #include <pulsecore/log.h>
42 #include <pulsecore/core-error.h>
43 #include <pulsecore/atomic.h>
45 #include "protocol-simple.h"
47 /* Don't allow more than this many concurrent connections */
48 #define MAX_CONNECTIONS 10
50 typedef struct connection {
52 pa_protocol_simple *protocol;
54 pa_sink_input *sink_input;
55 pa_source_output *source_output;
57 pa_memblockq *input_memblockq, *output_memblockq;
62 pa_memblock *current_memblock;
63 size_t memblock_index, fragment_size;
68 PA_DECLARE_CLASS(connection);
69 #define CONNECTION(o) (connection_cast(o))
71 static PA_DEFINE_CHECK_TYPE(connection, connection_check_type, pa_msgobject_check_type);
73 struct pa_protocol_simple {
76 pa_socket_server*server;
77 pa_idxset *connections;
85 pa_sample_spec sample_spec;
86 char *source_name, *sink_name;
90 SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */
91 SINK_INPUT_MESSAGE_DISABLE_PREBUF /* disabled prebuf, get playback started. */
95 MESSAGE_REQUEST_DATA, /* data requested from sink input from the main loop */
96 MESSAGE_POST_DATA, /* data from source output to main loop */
97 MESSAGE_DROP_CONNECTION /* Please drop a aconnection now */
101 #define PLAYBACK_BUFFER_SECONDS (.5)
102 #define PLAYBACK_BUFFER_FRAGMENTS (10)
103 #define RECORD_BUFFER_SECONDS (5)
104 #define RECORD_BUFFER_FRAGMENTS (100)
106 static void connection_free(pa_object *o) {
107 connection *c = CONNECTION(o);
110 if (c->playback.current_memblock)
111 pa_memblock_unref(c->playback.current_memblock);
114 pa_iochannel_free(c->io);
115 if (c->input_memblockq)
116 pa_memblockq_free(c->input_memblockq);
117 if (c->output_memblockq)
118 pa_memblockq_free(c->output_memblockq);
123 static void connection_drop(connection *c) {
126 pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
129 pa_sink_input_disconnect(c->sink_input);
130 pa_sink_input_unref(c->sink_input);
131 c->sink_input = NULL;
134 if (c->source_output) {
135 pa_source_output_disconnect(c->source_output);
136 pa_source_output_unref(c->source_output);
137 c->source_output = NULL;
141 pa_client_free(c->client);
148 static int do_read(connection *c) {
156 if (!c->sink_input || !(l = pa_atomic_load(&c->playback.missing)))
159 if (l > c->playback.fragment_size)
160 l = c->playback.fragment_size;
162 if (c->playback.current_memblock)
163 if (pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index < l) {
164 pa_memblock_unref(c->playback.current_memblock);
165 c->playback.current_memblock = NULL;
166 c->playback.memblock_index = 0;
169 if (!c->playback.current_memblock) {
170 pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, l));
171 c->playback.memblock_index = 0;
174 p = pa_memblock_acquire(c->playback.current_memblock);
175 r = pa_iochannel_read(c->io, (uint8_t*) p + c->playback.memblock_index, l);
176 pa_memblock_release(c->playback.current_memblock);
180 if (errno == EINTR || errno == EAGAIN)
183 pa_log_debug("read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno));
187 chunk.memblock = c->playback.current_memblock;
188 chunk.index = c->playback.memblock_index;
191 c->playback.memblock_index += r;
193 pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, &chunk, NULL);
198 static int do_write(connection *c) {
205 if (!c->source_output)
208 if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0) {
209 /* pa_log("peek failed"); */
213 pa_assert(chunk.memblock);
214 pa_assert(chunk.length);
216 p = pa_memblock_acquire(chunk.memblock);
217 r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length);
218 pa_memblock_release(chunk.memblock);
220 pa_memblock_unref(chunk.memblock);
224 if (errno == EINTR || errno == EAGAIN)
227 pa_log("write(): %s", pa_cstrerror(errno));
231 pa_memblockq_drop(c->output_memblockq, &chunk, r);
236 static void do_work(connection *c) {
242 if (pa_iochannel_is_readable(c->io)) {
245 } else if (pa_iochannel_is_hungup(c->io))
248 if (pa_iochannel_is_writable(c->io)) {
259 /* If there is a sink input, we first drain what we already have read before shutting down the connection */
262 pa_iochannel_free(c->io);
265 pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, NULL, NULL);
270 static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) {
271 connection *c = CONNECTION(o);
273 connection_assert_ref(c);
276 case MESSAGE_REQUEST_DATA:
280 case MESSAGE_POST_DATA:
281 /* pa_log("got data %u", chunk->length); */
282 pa_memblockq_push_align(c->output_memblockq, chunk);
286 case MESSAGE_DROP_CONNECTION:
294 /*** sink_input callbacks ***/
296 /* Called from thread context */
297 static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
298 pa_sink_input *i = PA_SINK_INPUT(o);
307 case SINK_INPUT_MESSAGE_POST_DATA: {
310 /* New data from the main loop */
311 pa_memblockq_push_align(c->input_memblockq, chunk);
312 pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq));
314 /* pa_log("got data, %u", pa_memblockq_get_length(c->input_memblockq)); */
319 case SINK_INPUT_MESSAGE_DISABLE_PREBUF: {
320 pa_memblockq_prebuf_disable(c->input_memblockq);
324 case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
325 pa_usec_t *r = userdata;
327 *r = pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
329 /* Fall through, the default handler will add in the extra
330 * latency added by the resampler */
334 return pa_sink_input_process_msg(o, code, userdata, chunk);
338 /* Called from thread context */
339 static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
348 r = pa_memblockq_peek(c->input_memblockq, chunk);
350 /* pa_log("peeked %u %i", r >= 0 ? chunk->length: 0, r); */
352 if (c->dead && r < 0)
353 pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, c, NULL, NULL);
358 /* Called from thread context */
359 static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) {
360 connection*c = i->userdata;
367 old = pa_memblockq_missing(c->input_memblockq);
368 pa_memblockq_drop(c->input_memblockq, chunk, length);
369 new = pa_memblockq_missing(c->input_memblockq);
371 pa_atomic_store(&c->playback.missing, new);
374 pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_REQUEST_DATA, NULL, NULL, NULL);
377 /* Called from main context */
378 static void sink_input_kill_cb(pa_sink_input *i) {
380 pa_assert(i->userdata);
382 connection_drop((connection *) i->userdata);
385 /*** source_output callbacks ***/
387 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
395 pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_POST_DATA, NULL, chunk, NULL);
398 static void source_output_kill_cb(pa_source_output *o) {
408 static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
415 return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
418 /*** client callbacks ***/
420 static void client_kill_cb(pa_client *client) {
424 c = client->userdata;
430 /*** pa_iochannel callbacks ***/
432 static void io_callback(pa_iochannel*io, void *userdata) {
433 connection *c = userdata;
441 /*** socket_server callbacks ***/
443 static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) {
444 pa_protocol_simple *p = userdata;
445 connection *c = NULL;
452 if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {
453 pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS);
454 pa_iochannel_free(io);
458 c = pa_msgobject_new(connection, connection_check_type);
459 c->parent.parent.free = connection_free;
460 c->parent.process_msg = connection_process_msg;
462 c->sink_input = NULL;
463 c->source_output = NULL;
464 c->input_memblockq = c->output_memblockq = NULL;
466 c->playback.current_memblock = NULL;
467 c->playback.memblock_index = 0;
468 c->playback.fragment_size = 0;
470 pa_atomic_store(&c->playback.missing, 0);
472 pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname));
473 pa_assert_se(c->client = pa_client_new(p->core, __FILE__, cname));
474 c->client->owner = p->module;
475 c->client->kill = client_kill_cb;
476 c->client->userdata = c;
478 if (p->mode & PLAYBACK) {
479 pa_sink_input_new_data data;
482 pa_sink_input_new_data_init(&data);
483 data.driver = __FILE__;
484 data.name = c->client->name;
485 pa_sink_input_new_data_set_sample_spec(&data, &p->sample_spec);
486 data.module = p->module;
487 data.client = c->client;
489 if (!(c->sink_input = pa_sink_input_new(p->core, &data, 0))) {
490 pa_log("Failed to create sink input.");
494 c->sink_input->parent.process_msg = sink_input_process_msg;
495 c->sink_input->peek = sink_input_peek_cb;
496 c->sink_input->drop = sink_input_drop_cb;
497 c->sink_input->kill = sink_input_kill_cb;
498 c->sink_input->userdata = c;
500 l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
501 c->input_memblockq = pa_memblockq_new(
505 pa_frame_size(&p->sample_spec),
507 l/PLAYBACK_BUFFER_FRAGMENTS,
509 pa_assert(c->input_memblockq);
510 pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
511 c->playback.fragment_size = l/PLAYBACK_BUFFER_FRAGMENTS;
513 pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq));
515 pa_sink_input_put(c->sink_input);
518 if (p->mode & RECORD) {
519 pa_source_output_new_data data;
522 pa_source_output_new_data_init(&data);
523 data.driver = __FILE__;
524 data.name = c->client->name;
525 pa_source_output_new_data_set_sample_spec(&data, &p->sample_spec);
526 data.module = p->module;
527 data.client = c->client;
529 if (!(c->source_output = pa_source_output_new(p->core, &data, 0))) {
530 pa_log("Failed to create source output.");
533 c->source_output->push = source_output_push_cb;
534 c->source_output->kill = source_output_kill_cb;
535 c->source_output->get_latency = source_output_get_latency_cb;
536 c->source_output->userdata = c;
538 l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS);
539 c->output_memblockq = pa_memblockq_new(
543 pa_frame_size(&p->sample_spec),
547 pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
549 pa_source_output_put(c->source_output);
553 pa_iochannel_set_callback(c->io, io_callback, c);
554 pa_idxset_put(p->connections, c, NULL);
563 pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) {
564 pa_protocol_simple* p = NULL;
571 p = pa_xnew0(pa_protocol_simple, 1);
575 p->connections = pa_idxset_new(NULL, NULL);
577 p->sample_spec = core->default_sample_spec;
578 if (pa_modargs_get_sample_spec(ma, &p->sample_spec) < 0) {
579 pa_log("Failed to parse sample type specification.");
583 p->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL));
584 p->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));
587 if (pa_modargs_get_value_boolean(ma, "record", &enable) < 0) {
588 pa_log("record= expects a numeric argument.");
591 p->mode = enable ? RECORD : 0;
594 if (pa_modargs_get_value_boolean(ma, "playback", &enable) < 0) {
595 pa_log("playback= expects a numeric argument.");
598 p->mode |= enable ? PLAYBACK : 0;
600 if ((p->mode & (RECORD|PLAYBACK)) == 0) {
601 pa_log("neither playback nor recording enabled for protocol.");
605 pa_socket_server_set_callback(p->server, on_connection, p);
611 pa_protocol_simple_free(p);
617 void pa_protocol_simple_free(pa_protocol_simple *p) {
621 if (p->connections) {
622 while((c = pa_idxset_first(p->connections, NULL)))
625 pa_idxset_free(p->connections, NULL, NULL);
629 pa_socket_server_unref(p->server);