processing echo-cancellation in module-tizenaudio-echo-cancel moved to processor
modules for supporting reference copy.
* created reference copy
* removed delayq
[Version] 15.0.10
[Issue Type] New Feature
Change-Id: Id4529e7f6225b1c3f1d6e499bc915aaef1a940c5
Signed-off-by: Jaechul Lee <jcsing.lee@samsung.com>
libprocessor_la_SOURCES = \
src/echo-cancel/algo_speex.c \
+ src/echo-cancel/algo_reference_copy.c \
src/echo-cancel/algo_adrian.c \
src/echo-cancel/adrian-aec.c \
src/echo-cancel/processor.c \
if ENABLE_WEBRTC
libprocessor_la_SOURCES += src/echo-cancel/algo_webrtc.cpp
libprocessor_la_LIBADD += $(WEBRTC_LIBS)
-libprocessor_la_CPPFLAGS = $(WEBRTC_CFLAGS) $(PA_CFLAGS) -std=c++17
+libprocessor_la_CPPFLAGS = $(WEBRTC_CFLAGS) $(PA_CFLAGS) -DSUPPORT_METHOD_WEBRTC -std=c++17
endif
module_tizenaudio_echo_cancel_la_SOURCES = src/echo-cancel/module-tizenaudio-echo-cancel.c src/echo-cancel/echo-cancel-def.h
Name: pulseaudio-modules-tizen
Summary: Pulseaudio modules for Tizen
-Version: 15.0.9
+Version: 15.0.10
Release: 0
Group: Multimedia/Audio
License: LGPL-2.1+
--- /dev/null
+/***
+ This file is part of PulseAudio.
+
+ Copyright 2022 Jaechul Lee <jcsing.lee@samsung.com>
+
+ PulseAudio is free software; you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published
+ by the Free Software Foundation; either version 2.1 of the License,
+ or (at your option) any later version.
+
+ PulseAudio is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with PulseAudio; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ USA.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdint.h>
+#include <pulse/xmalloc.h>
+#include <pulse/sample.h>
+#include <pulse/channelmap.h>
+#include <pulsecore/log.h>
+#include <pulsecore/macro.h>
+
+struct reference_copy {
+ size_t nframes;
+ pa_sample_spec ss;
+ int reference_channels;
+};
+
+void *reference_copy_create(size_t nframes, pa_sample_spec *ss) {
+ struct reference_copy *rc;
+
+ pa_assert(ss);
+
+ rc = pa_xnew0(struct reference_copy, 1);
+ rc->nframes = nframes;
+ rc->ss = *ss;
+
+ return rc;
+}
+
+int32_t reference_copy_process(void *priv, int8_t *rec, int8_t *ref, int8_t *out) {
+ struct reference_copy *rc = priv;
+ size_t rec_bytes, ref_bytes, actual_bytes, total_bytes;
+ int8_t *dst = out;
+
+ pa_assert(rc);
+ pa_assert(rec);
+ pa_assert(ref);
+ pa_assert(out);
+
+ rec_bytes = pa_frame_size(&rc->ss);
+ ref_bytes = rc->reference_channels * pa_sample_size(&rc->ss);
+ actual_bytes = rec_bytes - ref_bytes;
+ total_bytes = rec_bytes * rc->nframes;
+
+ while ((dst - out) < total_bytes) {
+ memcpy(dst, rec, actual_bytes);
+ memcpy(dst + actual_bytes, ref, ref_bytes);
+ dst += rec_bytes;
+ rec += rec_bytes;
+ ref += ref_bytes;
+ }
+
+ return 0;
+}
+
+int32_t reference_copy_destroy(void *priv) {
+ struct reference_copy *rc = priv;
+
+ pa_assert(rc);
+
+ pa_xfree(rc);
+
+ return 0;
+}
+
+int32_t reference_copy_change_reference_spec(void *priv, pa_sample_spec *source_ss, pa_sample_spec *sample_spec, pa_channel_map *map) {
+ struct reference_copy *rc = priv;
+ int channels;
+
+ pa_assert(rc);
+ pa_assert(source_ss);
+ pa_assert(sample_spec);
+ pa_assert(map);
+
+ channels = rc->ss.channels - source_ss->channels;
+ if (channels <= 0)
+ return -1;
+
+ *sample_spec = rc->ss;
+ sample_spec->channels = rc->reference_channels = channels;
+
+ pa_channel_map_init_auto(map, channels, PA_CHANNEL_MAP_AIFF);
+
+ return 0;
+}
PA_MODULE_VERSION(PACKAGE_VERSION);
PA_MODULE_LOAD_ONCE(true);
PA_MODULE_USAGE(
- "method=<name of method using for echo cancellation> "
- "blocksize=<the bytes of processing block on source domain> ");
+ "method=<name of method using for echo cancellation> ");
+
+#define DEFAULT_PROCESS_MSEC 10
typedef struct echo_cancel pa_echo_cancel;
struct userdata {
bool enable;
uint32_t n_source_output;
- size_t blocksize;
+
char *default_method;
pa_thread *thread;
pa_echo_cancel *echo_cancel;
/* use in thread */
- pa_memblockq *delayq;
bool enable_in_thread;
+ bool triggered;
pa_asyncmsgq *asyncmsgq_sink;
pa_asyncmsgq *asyncmsgq_source;
#define PA_ECHO_CANCEL(o) (pa_echo_cancel_cast(o))
#define MEMBLOCKQ_MAXLENGTH (16 * 1024 * 1024)
-#define CHECK_FLAGS_AEC(x) (x & PA_SOURCE_OUTPUT_ECHO_CANCEL)
-#define CHECK_COUNT_SOURCE_OUTPUT_AEC(x) (x->n_source_output)
#define DEFAULT_AEC_METHOD "speex"
static const char* const valid_modargs[] = {
"method",
- "blocksize",
NULL,
};
-static int proplist_get_fragment_size(pa_proplist *p, size_t *size) {
- const char *fragsize;
- uint32_t blocksize;
+static int proplist_get_fragment_size_usec(pa_proplist *p, pa_sample_spec *sample_spec, pa_usec_t *usec) {
+ const char *prop_fragsize;
+ uint32_t fragsize;
+
+ pa_assert(p);
+ pa_assert(sample_spec);
+ pa_assert(usec);
- if (!(fragsize = pa_proplist_gets(p, PA_PROP_DEVICE_BUFFERING_FRAGMENT_SIZE)))
+ if (!(prop_fragsize = pa_proplist_gets(p, PA_PROP_DEVICE_BUFFERING_FRAGMENT_SIZE)))
return -1;
- if (pa_atou(fragsize, &blocksize))
+ if (pa_atou(prop_fragsize, &fragsize))
return -1;
- *size = blocksize;
+ *usec = pa_bytes_to_usec(fragsize, sample_spec);
return 0;
}
-static int pa_processor_get_method(pa_source_output *o, const char *default_method, pa_processor_algo_t *method) {
- const char *selected, *requested;
+static int proplist_get_method(pa_proplist *p, const char *default_method, pa_processor_method_t *method) {
+ const char *m;
+ pa_assert(p);
pa_assert(method);
- pa_assert(default_method);
- requested = pa_proplist_gets(o->proplist, PA_PROP_MEDIA_ECHO_CANCEL_METHOD);
- if (!requested)
+ if (!(m = pa_proplist_gets(p, PA_PROP_MEDIA_ECHO_CANCEL_METHOD)))
return -1;
- selected = pa_streq(requested, "default") ? default_method : requested;
-
- if (pa_streq(selected, "webrtc"))
- *method = PA_PROCESSOR_WEBRTC;
- else if (pa_streq(selected, "speex"))
- *method = PA_PROCESSOR_SPEEX;
- else if (pa_streq(selected, "adrian"))
- *method = PA_PROCESSOR_ADRIAN;
- else
- *method = PA_PROCESSOR_SPEEX;
+ *method = pa_processor_get_method(m, default_method);
return 0;
}
while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
pa_source_output_assert_ref(o);
- if (CHECK_FLAGS_AEC(o->flags))
+ if (o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL)
break;
}
return o ? o : NULL;
}
-static void free_source_output_extra_resource(pa_source_output *o) {
- pa_assert(o);
-
- if (o->thread_info.processor) {
- pa_processor_free(o->thread_info.processor);
- o->thread_info.processor = NULL;
- }
-
- if (o->thread_info.resampler2) {
- pa_resampler_free(o->thread_info.resampler2);
- o->thread_info.resampler2 = NULL;
- }
-
- if (o->thread_info.echo) {
- pa_memblockq_free(o->thread_info.echo);
- o->thread_info.echo = NULL;
- }
-}
-
-static void free_source_output_extra_resource_by_source(pa_source *s) {
- pa_assert(s);
-
- free_source_output_extra_resource(find_source_output_by_flags(s));
-}
-
static pa_usec_t get_round_trip_latency(struct userdata *u) {
pa_usec_t sink_latency;
pa_usec_t source_latency;
pa_assert(u);
pa_assert(u->sink);
- sink_latency = pa_sink_get_latency(u->sink);
- source_latency = pa_source_get_latency(u->source);
+ pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_GET_LATENCY, &sink_latency, 0, NULL);
+ pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), PA_SOURCE_MESSAGE_GET_LATENCY, &source_latency, 0, NULL);
- pa_log_info("sink latency (%llu), source latency(%llu)", sink_latency, source_latency);
+ pa_log_info("sink latency (%" PRIu64 "), source latency(%" PRIu64 ")", sink_latency, source_latency);
return sink_latency + source_latency;
}
-static int setup_delayq_latency(struct userdata *u, pa_usec_t latency) {
- int64_t write_index, read_index;
- size_t bytes, blocksize, n;
- pa_memchunk silence;
-
- if (proplist_get_fragment_size(u->sink->proplist, &blocksize))
- return -1;
-
- if (u->delayq)
- pa_memblockq_free(u->delayq);
-
- u->delayq = pa_memblockq_new("echo reference delay",
- 0,
- MEMBLOCKQ_MAXLENGTH,
- 0,
- &u->sink->sample_spec,
- 0,
- blocksize,
- 0,
- NULL);
-
- bytes = pa_usec_to_bytes(latency, &u->sink->sample_spec);
- n = (bytes + blocksize - 1) / blocksize;
-
- pa_silence_memchunk_get(
- &u->sink->core->silence_cache,
- u->sink->core->mempool,
- &silence,
- &u->sink->sample_spec,
- blocksize);
-
- if (!silence.memblock)
- return -1;
-
- write_index = pa_memblockq_get_write_index(u->delayq);
- read_index = pa_memblockq_get_read_index(u->delayq);
-
- pa_memblockq_flush_write(u->delayq, true);
- while (n-- > 0)
- pa_memblockq_push(u->delayq, &silence);
-
- pa_log_info("push n(%d) blocks. write_index(%llu->%llu), read_index(%llu->%llu)",
- pa_memblockq_get_nblocks(u->delayq),
- write_index, pa_memblockq_get_write_index(u->delayq),
- read_index, pa_memblockq_get_read_index(u->delayq));
-
- pa_memblock_unref(silence.memblock);
-
- return 0;
-}
-
static int send_rebuild_rtpoll(pa_msgobject *dst, pa_msgobject *src, pa_asyncmsgq *q) {
struct arguments {
pa_msgobject *o;
}
/* Call from main thread */
-static void set_aec_state(struct userdata *u, bool enable) {
+static void broadcast_echo_cancel_state(struct userdata *u, pa_source_output *o, bool enable) {
void *v[2];
- pa_usec_t latency = 0ULL;
pa_assert(u);
pa_assert(u->source);
- pa_assert(u->sink); /* not allow sink null */
-
- pa_log_info("set_aec state %d -> %d", u->enable, enable);
-
- if (u->enable == enable)
- return;
+ pa_assert(u->sink);
- latency = enable ? get_round_trip_latency(u) : 0ULL;
+ if (enable) {
+ send_rebuild_rtpoll(PA_MSGOBJECT(u->source), PA_MSGOBJECT(u->echo_cancel), u->asyncmsgq_source);
+ send_rebuild_rtpoll(PA_MSGOBJECT(u->sink), PA_MSGOBJECT(u->echo_cancel), u->asyncmsgq_sink);
+ }
v[0] = (void *)enable;
- v[1] = &latency;
+ v[1] = o;
- /* There is a race condition between the source thread and the render thread.
- * pa_source_post function can be overlapped at the same time */
pa_asyncmsgq_send(u->thread_mq.inq, PA_MSGOBJECT(u->echo_cancel),
PA_ECHO_CANCEL_MESSAGE_SET_AEC_STATE, (void *)v, 0, NULL);
- if (u->sink)
- pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink),
- PA_SINK_MESSAGE_SET_AEC_STATE, (void *)enable, 0, NULL, NULL);
+ pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink),
+ PA_SINK_MESSAGE_SET_AEC_STATE, (void *)enable, 0, NULL, NULL);
pa_asyncmsgq_post(u->source->asyncmsgq, PA_MSGOBJECT(u->source),
PA_SOURCE_MESSAGE_SET_AEC_STATE, (void *)enable, 0, NULL, NULL);
+ if (!enable) {
+ send_rebuild_rtpoll(PA_MSGOBJECT(u->source), NULL, NULL);
+ send_rebuild_rtpoll(PA_MSGOBJECT(u->sink), NULL, NULL);
+ }
+}
+
+static void set_echo_cancel_state(struct userdata *u, bool enable) {
+ pa_source_output *o;
+
+ pa_assert(u);
+ pa_assert(u->source);
+ pa_assert(u->sink);
+
+ if (u->enable == enable)
+ return;
+
+ o = find_source_output_by_flags(u->source);
+ if (!o) {
+ pa_log_error("Failed to find EC source-output");
+ return;
+ }
+
+ broadcast_echo_cancel_state(u, o, enable);
u->enable = enable;
- pa_log_info("AEC state updated. enable(%d)", u->enable);
+ pa_log_info("AEC state is changed. enable(%d)", u->enable);
}
static int update_state_by_sink(struct userdata *u, bool enable) {
pa_assert(u);
- if (CHECK_COUNT_SOURCE_OUTPUT_AEC(u) == 0)
+ if (u->n_source_output == 0)
return 0;
- set_aec_state(u, enable);
+ set_echo_cancel_state(u, enable);
return 0;
}
if (enable) {
if (u->n_source_output++ == 0) {
if (!u->sink || PA_SINK_IS_RUNNING(u->sink->state))
- set_aec_state(u, enable);
+ set_echo_cancel_state(u, enable);
}
} else {
if (--u->n_source_output == 0)
- set_aec_state(u, enable);
+ set_echo_cancel_state(u, enable);
}
return 0;
}
-static int unlink_source_output_in_thread(pa_source_output *o) {
- pa_assert(o);
-
- free_source_output_extra_resource(o);
-
- /* source-output should be remove by render thread to prevent race condition.
- * 1. invoke REMOVE message
- * 2. receive the message in tizenaudio-source
- * 3. send the message to tizenaudio-echo-cancel
- * 4. remove source-output in render thread
- */
- pa_source_process_msg(PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
-
- return 0;
-}
-
static void pa_source_push_echo(pa_source *s, pa_memchunk *chunk) {
pa_source_output *o = NULL;
- pa_memchunk ochunk;
- bool nf = false;
- int r;
o = find_source_output_by_flags(s);
if (!o) {
return;
}
- if (o->thread_info.resampler2) {
- pa_resampler_run(o->thread_info.resampler2, chunk, &ochunk);
- chunk = &ochunk;
- nf = true;
- }
-
- r = pa_memblockq_push(o->thread_info.echo, chunk);
- if (r != 0)
- pa_log_error("Failed to push chunk to memblockq");
-
- if (nf)
- pa_memblock_unref(chunk->memblock);
-
- pa_log_debug("Pushed echo data. index(%u) size(%llums), nblocks(%d) index(%lld:%lld)",
- o->index,
- pa_bytes_to_usec(chunk->length, &o->sample_spec) / PA_USEC_PER_MSEC,
- pa_memblockq_get_nblocks(o->thread_info.echo),
- pa_memblockq_get_write_index(o->thread_info.echo),
- pa_memblockq_get_read_index(o->thread_info.echo));
-}
-
-static void flush_echo_memblockq(pa_source *s) {
- pa_source_output *o;
-
- o = find_source_output_by_flags(s);
- if (!o) {
- pa_log_error("Can't find aec source-output");
- return;
- }
-
- pa_memblockq_flush_write(o->thread_info.echo, true);
+ if (pa_processor_push_data(o->thread_info.processor, chunk) < 0)
+ pa_log_error("Failed to push reference data");
}
static int post_process(pa_source_output *o, pa_memchunk *chunk, pa_memchunk *ochunk) {
int ret = -1;
- int8_t *rec, *ref, *out;
- size_t blocksize;
-
- pa_memchunk tchunk;
-
- if (!o->thread_info.processor) {
- pa_log_error("Failed to get processor");
- return ret;
- }
-
- /*
- * pre-condition
- * sink block >= source block >= blocksize * n
- * if blocksize is not described blocksize, It should be same as source's fragment size
- *
- * chunk must be processed every data.
- */
- /* reference exist */
- if (o->thread_info.echo) {
- size_t block_bytes, length, n;
-
- /* echo queue is not ready that means reference is not started */
- if (pa_memblockq_is_empty(o->thread_info.echo))
- return ret;
-
- blocksize = pa_processor_get_blocksize(o->thread_info.processor);
- block_bytes = blocksize * pa_frame_size(&o->sample_spec);
-
- if (chunk->length % block_bytes) {
- pa_log_warn("Skip to process aec. chunk size must be multiple of blocksize");
- return -1;
- }
-
- n = chunk->length / block_bytes;
- length = n * block_bytes;
-
- if (!(ret = pa_memblockq_peek_fixed_size(o->thread_info.echo, length, &tchunk))) {
- int i;
-
- ochunk->index = 0;
- ochunk->length = length;
- ochunk->memblock = pa_memblock_new(o->core->mempool, length);
-
- rec = pa_memblock_acquire(chunk->memblock);
- ref = pa_memblock_acquire(tchunk.memblock);
- out = pa_memblock_acquire(ochunk->memblock); /* TODO: buffer can be shared rec buffer */
-
- for (i=0; i<n; i++)
- pa_processor_process(o->thread_info.processor,
- rec + (i * block_bytes),
- ref + (i * block_bytes),
- out + (i * block_bytes));
-
- pa_memblock_release(chunk->memblock);
- pa_memblock_release(tchunk.memblock);
- pa_memblock_release(ochunk->memblock);
-
- pa_log_debug("Post-process. i(%u), rec(%llums), ref(%llums) "
- "block(%llums), process(%llums) * n(%d) "
- "silence(%d), index(%lld:%lld)",
- o->index,
- pa_bytes_to_usec(chunk->length, &o->sample_spec) / PA_USEC_PER_MSEC,
- pa_bytes_to_usec(tchunk.length, &o->sample_spec) / PA_USEC_PER_MSEC,
- pa_bytes_to_usec(length, &o->sample_spec) / PA_USEC_PER_MSEC,
- pa_bytes_to_usec(block_bytes, &o->sample_spec) / PA_USEC_PER_MSEC,
- n,
- pa_memblock_is_silence(tchunk.memblock),
- pa_memblockq_get_write_index(o->thread_info.echo),
- pa_memblockq_get_read_index(o->thread_info.echo));
-
- pa_memblock_unref(tchunk.memblock);
- pa_memblockq_drop(o->thread_info.echo, tchunk.length);
- }
- } else {
- /* no reference case like audio_share */
- rec = pa_memblock_acquire(chunk->memblock);
-
- ochunk->index = 0;
- ochunk->length = chunk->length;
- ochunk->memblock = pa_memblock_new(o->core->mempool, chunk->length);
- out = pa_memblock_acquire(ochunk->memblock);
-
- pa_processor_process(o->thread_info.processor, rec, NULL, out);
+ pa_assert(o);
+ pa_assert(chunk);
+ pa_assert(ochunk);
- pa_memblock_release(chunk->memblock);
- pa_memblock_release(ochunk->memblock);
- }
+ if ((ret = pa_processor_process(o->thread_info.processor, chunk, ochunk)) < 0)
+ pa_log_error("Failed to process data");
return ret;
}
struct userdata *u = PA_ECHO_CANCEL(o)->u;
- /* thread that pushes ref data should be called in render thread because of thread safe */
- switch (code) {
- case PA_ECHO_CANCEL_MESSAGE_PUSH_DATA: {
- /* a few pcm data will get lost. */
- if (!u->enable_in_thread)
- return 0;
+ /* trigger resolves a race condition related to post_process between source and render thread */
+ if (u->triggered) {
+ if (code == PA_ECHO_CANCEL_MESSAGE_PUSH_DATA || code == PA_ECHO_CANCEL_MESSAGE_PUSH_ECHO) {
+ pa_source_output *o = NULL;
+ pa_usec_t latency;
- pa_source_post(u->source, chunk);
+ o = find_source_output_by_flags(u->source);
+ o->post_process = post_process;
- return 0;
- }
- case PA_ECHO_CANCEL_MESSAGE_PUSH_ECHO: {
- pa_memchunk ochunk;
+ latency = get_round_trip_latency(u);
+
+ if (pa_processor_setup_reference_memblockq_padding(o->thread_info.processor, latency) < 0)
+ pa_log_warn("Failed to setup reference memblockq padding");
- if (!u->enable_in_thread)
- return 0;
+ u->triggered = false;
- pa_memblockq_push(u->delayq, chunk);
+ pa_log_info("Triggered Echo-Cancellation. index(%d), latency(%" PRIu64 ") usec", o->index, latency);
+ }
+ }
- if (!pa_memblockq_peek(u->delayq, &ochunk)) {
- pa_source_push_echo(u->source, &ochunk);
+ /* thread that pushes ref data should be called in render thread because of thread safe */
+ switch (code) {
+ case PA_ECHO_CANCEL_MESSAGE_PUSH_DATA:
+ if (u->enable_in_thread)
+ pa_source_post(u->source, chunk);
- pa_memblock_unref(ochunk.memblock);
- pa_memblockq_drop(u->delayq, ochunk.length);
- }
+ break;
+ case PA_ECHO_CANCEL_MESSAGE_PUSH_ECHO:
+ if (u->enable_in_thread)
+ pa_source_push_echo(u->source, chunk);
- return 0;
- }
- case PA_ECHO_CANCEL_MESSAGE_SET_AEC_STATE : {
+ break;
+ case PA_ECHO_CANCEL_MESSAGE_SET_AEC_STATE: {
void **v = (void **)data;
- pa_usec_t latency;
+ bool enable = (bool)v[0];
+ pa_source_output *o = (pa_source_output *)v[1];
- u->enable_in_thread = !!(int)v[0];
- latency = *(pa_usec_t *)v[1];
+ u->enable_in_thread = enable;
- if (u->enable_in_thread) {
- if (setup_delayq_latency(u, latency)) {
- pa_log_error("Failed to init delayq");
- return 0;
- }
+ if (enable) {
+ u->triggered = true;
} else {
- if (u->delayq) {
- pa_memblockq_free(u->delayq);
- u->delayq = NULL;
- }
-
- flush_echo_memblockq(u->source);
+ pa_processor_flush(o->thread_info.processor);
+ o->post_process = NULL;
}
- pa_log_info("EC state change (%d)", u->enable_in_thread);
-
- return 0;
+ break;
}
- case PA_ECHO_CANCEL_MESSAGE_SOURCE_OUTPUT_UNLINK:
- unlink_source_output_in_thread((pa_source_output *)data);
- return 0;
default:
- return 0;
+ break;
}
+
+ return 0;
}
static pa_hook_result_t source_output_new_cb(pa_core *c, pa_source_output_new_data *data, void *userdata) {
struct userdata *u = (struct userdata *)userdata;
- const char *method;
+ pa_processor_method_t method;
pa_assert(c);
pa_assert(u);
pa_assert(data);
- method = pa_proplist_gets(data->proplist, PA_PROP_MEDIA_ECHO_CANCEL_METHOD);
- if (!method)
- return PA_HOOK_OK;
-
- if (CHECK_COUNT_SOURCE_OUTPUT_AEC(u) > 0) {
- pa_log_error("Not allow multi aec instance");
+ if (proplist_get_method(data->proplist, u->default_method, &method) < 0)
return PA_HOOK_OK;
- }
- data->flags |= PA_SOURCE_OUTPUT_ECHO_CANCEL;
+ /* TODO: source-output can be moved */
data->flags |= PA_SOURCE_OUTPUT_DONT_MOVE;
+ data->flags |= PA_SOURCE_OUTPUT_ECHO_CANCEL;
+
+ if (method == PA_PROCESSOR_REFERENCE_COPY)
+ data->flags |= PA_SOURCE_OUTPUT_NO_REMAP;
+
+ pa_log_info("echo-cancel source-output will be created. method(%d)", method);
// TODO:add check limitation VARIOUS_RATE?
return PA_HOOK_OK;
return s;
}
-static int check_latency_validation(struct userdata *u, pa_sink *sink, pa_source *source) {
- pa_usec_t sink_usec;
- pa_usec_t source_usec;
- pa_usec_t block_usec;
- size_t blocksize;
-
- if (proplist_get_fragment_size(source->proplist, &blocksize)) {
- pa_log_debug("Failed to get blocksize from source");
- return -1;
- }
-
- source_usec = pa_bytes_to_usec(blocksize, &source->sample_spec);
- block_usec = u->blocksize ? pa_bytes_to_usec(u->blocksize, &source->sample_spec) : source_usec;
-
- /*
- * limitation
- * sink block >= source block >= blocksize * n
- */
- if (source_usec < block_usec) {
- pa_log_debug("Need to check period size. source >= block * n. "
- "source(%llums), block_usec(%llums)",
- source_usec / PA_USEC_PER_MSEC,
- block_usec / PA_USEC_PER_MSEC);
- return -1;
- }
-
- if (!u->sink)
- return 0;
-
- if (proplist_get_fragment_size(u->sink->proplist, &blocksize)) {
- pa_log_debug("Failed to get blocksize from sink");
- return -1;
- }
-
- sink_usec = pa_bytes_to_usec(blocksize, &u->sink->sample_spec);
-
- if (source_usec > sink_usec || sink_usec < block_usec) {
- pa_log_debug("Need to check period size. sink >= source >= block * n. "
- "source(%llums) sink(%llums) block_usec(%llums)",
- source_usec / PA_USEC_PER_MSEC,
- sink_usec / PA_USEC_PER_MSEC,
- block_usec / PA_USEC_PER_MSEC);
- return -1;
- }
-
- return 0;
-}
-
static pa_hook_result_t source_output_put_cb(pa_core *c, pa_source_output *o, void *userdata) {
struct userdata *u = (struct userdata *)userdata;
- size_t blocksize = u->blocksize;
- pa_processor_algo_t method;
+ pa_processor_method_t method;
+ pa_usec_t process_usec;
+ pa_usec_t sink_process_usec;
+ int r;
pa_assert(c);
pa_assert(o);
pa_assert(u);
pa_assert(o->source);
- if (!CHECK_FLAGS_AEC(o->flags))
+ if (!(o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL))
return PA_HOOK_OK;
- if (CHECK_COUNT_SOURCE_OUTPUT_AEC(u) > 0) {
+ if (u->n_source_output > 0) {
pa_log_error("Not allow multi aec instance");
goto fail;
}
+ if (proplist_get_method(o->proplist, u->default_method, &method) < 0) {
+ pa_log_error("Failed to get method");
+ goto fail;
+ }
+
u->source = o->source;
u->sink = find_reference_sink_by_proplist(c, o->proplist);
if (!u->sink) {
goto fail;
}
- if (check_latency_validation(u, u->sink, u->source)) {
- pa_log_error("Failed to check latency validation");
+ /* Get period size of sink and source */
+ if (proplist_get_fragment_size_usec(u->source->proplist, &u->source->sample_spec, &process_usec) < 0) {
+ pa_log_error("Failed to get fragment usec");
goto fail;
}
- /* Use the sources fragment size if blocksize is not specified */
- if (!blocksize) {
- if (proplist_get_fragment_size(u->source->proplist, &blocksize)) {
- pa_log_error("Failed to get blocksize");
- goto fail;
- }
+ if (proplist_get_fragment_size_usec(u->sink->proplist, &u->sink->sample_spec, &sink_process_usec) < 0) {
+ pa_log_error("Failed to get fragment usec");
+ goto fail;
}
- if (o->thread_info.resampler)
- blocksize = pa_resampler_result(o->thread_info.resampler, blocksize);
-
- if (pa_processor_get_method(o, u->default_method, &method)) {
- pa_log_error("Can't find method");
+ if (sink_process_usec < process_usec) {
+ pa_log_error("sink process usec should be bigger than source");
goto fail;
}
- o->thread_info.processor = pa_processor_new(blocksize / pa_frame_size(&o->sample_spec),
+ o->thread_info.processor = pa_processor_new(c, process_usec / PA_USEC_PER_MSEC,
&o->sample_spec,
- method, PA_PROCESSOR_FLAGS_ECHO_CANCEL);
+ &o->channel_map,
+ &u->source->sample_spec,
+ method);
if (!o->thread_info.processor) {
pa_log_error("Failed to create pa_processor. echo-cancellation will be disabled");
goto fail;
}
- if (u->sink) {
- if (proplist_get_fragment_size(u->sink->proplist, &blocksize)) {
- pa_log_error("Failed to get blocksize");
- goto fail;
- }
-
- if (!pa_sample_spec_equal(&u->sink->sample_spec, &o->sample_spec)) {
- pa_resampler *resampler2;
-
- resampler2 = pa_resampler_new(
- c->mempool,
- &u->sink->sample_spec, &u->sink->channel_map,
- &o->sample_spec, &o->channel_map,
- c->lfe_crossover_freq,
- c->resample_method, 0);
-
- if (!resampler2) {
- pa_log_error("Failed to allocate resampler2 for echo-cancel");
- goto fail;
- }
-
- o->thread_info.resampler2 = resampler2;
- blocksize = pa_resampler_result(o->thread_info.resampler2, blocksize);
-
- pa_log_info("Use resampler2. blocksize(%d) bytes", blocksize);
- }
-
- o->thread_info.echo = pa_memblockq_new("echo reference",
- 0,
- MEMBLOCKQ_MAXLENGTH,
- 0,
- &o->sample_spec,
- 0,
- blocksize,
- 0,
- &o->source->silence);
-
- if (!o->thread_info.echo) {
- pa_log_error("Failed to alloc memblockq");
- goto fail;
- }
+ r = pa_processor_bind_reference(o->thread_info.processor,
+ sink_process_usec / PA_USEC_PER_MSEC,
+ &u->sink->sample_spec,
+ &u->sink->channel_map);
+ if (r < 0) {
+ pa_log_error("Failed to bind reference source");
+ goto fail;
}
- o->post_process = post_process;
-
- /* connect to sink and source */
- send_rebuild_rtpoll(PA_MSGOBJECT(u->source), PA_MSGOBJECT(u->echo_cancel), u->asyncmsgq_source);
- send_rebuild_rtpoll(PA_MSGOBJECT(u->sink), PA_MSGOBJECT(u->echo_cancel), u->asyncmsgq_sink);
+ pa_log_info("echo-cancel source-output(%u) created. process_msec(%u), sink_process_msec(%u), method(%s)",
+ o->index,
+ (uint32_t)(process_usec / PA_USEC_PER_MSEC),
+ (uint32_t)(sink_process_usec / PA_USEC_PER_MSEC),
+ pa_processor_method_to_string(method));
update_state_by_source(u, true);
fail:
o->flags &= ~PA_SOURCE_OUTPUT_ECHO_CANCEL; // TODO: need to consider DONT_MOVE define
- free_source_output_extra_resource(o);
+ if (o->thread_info.processor)
+ pa_processor_free(o->thread_info.processor);
return PA_HOOK_OK;
}
/* Call from main thread */
-static pa_hook_result_t source_output_unlink_post_cb(pa_core *c, pa_source_output *o, void *userdata) {
+static pa_hook_result_t source_output_unlink_cb(pa_core *c, pa_source_output *o, void *userdata) {
struct userdata *u = (struct userdata *)userdata;
pa_assert(c);
pa_assert(o);
pa_assert(u);
- if (!CHECK_FLAGS_AEC(o->flags))
+ if (!(o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL))
return PA_HOOK_OK;
update_state_by_source(u, false);
- send_rebuild_rtpoll(PA_MSGOBJECT(u->source), NULL, NULL);
- send_rebuild_rtpoll(PA_MSGOBJECT(u->sink), NULL, NULL);
+ return PA_HOOK_OK;
+}
+
+static pa_hook_result_t source_output_unlink_post_cb(pa_core *c, pa_source_output *o, void *userdata) {
+ struct userdata *u = (struct userdata *)userdata;
+
+ pa_assert(c);
+ pa_assert(o);
+ pa_assert(u);
+
+ if (!(o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL))
+ return PA_HOOK_OK;
+
+ pa_processor_free(o->thread_info.processor);
u->source = NULL;
u->sink = NULL;
+ pa_log_info("echo-cancel source-output(%u) is unlinked", o->index);
+
return PA_HOOK_OK;
}
int pa__init(pa_module *m) {
pa_modargs *ma = NULL;
struct userdata *u = NULL;
- uint32_t blocksize = 0;
pa_assert(m);
return -1;
}
- if (pa_modargs_get_value_u32(ma, "blocksize", &blocksize) < 0) {
- pa_log_info("Failed to get blocksize");
- goto fail;
- }
-
m->userdata = u = pa_xnew0(struct userdata, 1);
u->core = m->core;
u->m = m;
- u->blocksize = blocksize;
u->default_method = pa_xstrdup(pa_modargs_get_value(ma, "method", DEFAULT_AEC_METHOD));
u->echo_cancel = pa_msgobject_new(pa_echo_cancel);
pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_PUT],
PA_HOOK_EARLY, (pa_hook_cb_t) source_output_put_cb, u);
+ u->source_output_unlink_slot =
+ pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_UNLINK],
+ PA_HOOK_EARLY, (pa_hook_cb_t) source_output_unlink_cb, u);
+
u->source_output_unlink_post_slot =
pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_UNLINK_POST],
PA_HOOK_EARLY, (pa_hook_cb_t) source_output_unlink_post_cb, u);
if (u->asyncmsgq_source) {
if (u->source) {
+ pa_source_output *o;
+
pa_asyncmsgq_post(u->source->asyncmsgq, PA_MSGOBJECT(u->source),
PA_SOURCE_MESSAGE_SET_AEC_STATE, (void *)false, 0, NULL, NULL);
send_rebuild_rtpoll(PA_MSGOBJECT(u->source), NULL, NULL);
- free_source_output_extra_resource_by_source(u->source);
+
+ if ((o = find_source_output_by_flags(u->source)))
+ pa_processor_free(o->thread_info.processor);
}
pa_asyncmsgq_unref(u->asyncmsgq_source);
}
- if (u->delayq)
- pa_memblockq_free(u->delayq);
-
if (u->rtpoll)
pa_rtpoll_free(u->rtpoll);
#endif
#include <pulse/xmalloc.h>
+#include <pulse/timeval.h>
+#include <pulse/channelmap.h>
#include <pulsecore/log.h>
#include <pulsecore/macro.h>
+#include <pulsecore/resampler.h>
+#include <pulsecore/core-util.h>
#include "processor.h"
+//#define __DEBUG__
#ifdef __DEBUG__
#include <stdio.h>
#include <sys/time.h>
#include <unistd.h>
#endif
-struct pa_processor_algo {
+#define MEMBLOCKQ_MAXLENGTH (16 * 1024 * 1024)
+
+typedef struct pa_processor_method_interface pa_processor_method_interface;
+struct pa_processor {
+ pa_core *core;
+ pa_processor_method_interface *intf;
+ pa_processor_method_t method;
+ void *priv;
+
+ size_t process_frames;
+ pa_usec_t process_usec;
+ pa_usec_t reference_process_usec;
+ size_t process_bytes;
+ size_t reference_process_bytes;
+
+ pa_sample_spec *output_ss;
+ pa_channel_map *output_chmap;
+ pa_sample_spec *source_ss;
+ pa_sample_spec reference_memblockq_ss;
+
+ pa_resampler *resampler;
+ pa_memblockq *reference_memblockq;
+ pa_memblockq *output_memblockq;
+
+#ifdef __DEBUG__
+ int fdrec, fdref, fdout;
+ struct timeval before, after;
+#endif
+};
+
+struct pa_processor_method_interface {
+ const char *name;
void *(*create)(size_t nframes, pa_sample_spec *ss);
int32_t (*process)(void *priv, int8_t *rec, int8_t *ref, int8_t *out);
int32_t (*destroy)(void *priv);
+ int32_t (*change_reference_spec)(void *priv, pa_sample_spec *source_ss, pa_sample_spec *sample_spec, pa_channel_map *map);
};
extern void *adrian_create(size_t nframes, pa_sample_spec *ss);
extern int32_t speex_process(void *priv, int8_t *rec, int8_t *ref, int8_t *out);
extern int32_t speex_destroy(void *priv);
+#ifdef SUPPORT_METHOD_WEBRTC
extern void *webrtc_audio_create(size_t nframes, pa_sample_spec *ss);
extern int32_t webrtc_audio_process(void *priv, int8_t *rec, int8_t *ref, int8_t *out);
extern int32_t webrtc_audio_destroy(void *priv);
+#endif
-pa_processor *pa_processor_new(size_t nframes, pa_sample_spec *ss, pa_processor_algo_t backend, pa_process_flags_t flags) {
- pa_processor *processor = NULL;
+extern void *reference_copy_create(size_t nframes, pa_sample_spec *ss);
+extern int32_t reference_copy_process(void *priv, int8_t *rec, int8_t *ref, int8_t *out);
+extern int32_t reference_copy_destroy(void *priv);
+extern int32_t reference_copy_change_reference_spec(void *priv, pa_sample_spec *source_ss, pa_sample_spec *sample_spec, pa_channel_map *map);
- pa_assert(ss);
+#ifdef __DEBUG__
+static void debug_open_file(pa_processor *processor);
+static void debug_timestamp_begin(pa_processor *processor);
+static void debug_timestamp_end(pa_processor *processor);
+static void debug_write_file(pa_processor *processor, int8_t *rec, int8_t *ref, int8_t *out);
+static void debug_close_file(pa_processor *processor);
+#else
+#define debug_open_file(x)
+#define debug_timestamp_begin(x)
+#define debug_timestamp_end(x)
+#define debug_write_file(x, a, b, c)
+#define debug_close_file(x)
+#endif
- if (ss->format != PA_SAMPLE_S16LE) {
- pa_log_error("Not supported format(%d)", ss->format);
+static struct pa_processor_method_interface method_table[PA_PROCESSOR_METHOD_MAX] = {
+ {
+ "speex",
+ speex_create,
+ speex_process,
+ speex_destroy,
+ NULL,
+ },
+ {
+ "adrian",
+ adrian_create,
+ adrian_process,
+ adrian_destroy,
+ NULL,
+ },
+#ifdef SUPPORT_METHOD_WEBRTC
+ {
+ "webrtc",
+ webrtc_audio_create,
+ webrtc_audio_process,
+ webrtc_audio_destroy,
+ NULL,
+ },
+#endif
+ {
+ "reference_copy",
+ reference_copy_create,
+ reference_copy_process,
+ reference_copy_destroy,
+ reference_copy_change_reference_spec,
+ },
+};
+
+static size_t pa_processor_usec_to_frame(pa_usec_t usec, pa_sample_spec *spec) {
+ pa_assert(spec);
+
+ return pa_usec_to_bytes(usec, spec) / pa_frame_size(spec);
+}
+
+pa_processor *pa_processor_new(pa_core *core,
+ uint32_t process_msec,
+ pa_sample_spec *output_ss,
+ pa_channel_map *output_map,
+ pa_sample_spec *source_ss,
+ pa_processor_method_t method) {
+ pa_processor *processor;
+ pa_memchunk silence;
+
+ pa_assert(core);
+ pa_assert(output_ss);
+ pa_assert(output_map);
+ pa_assert(source_ss);
+ pa_assert(method < PA_PROCESSOR_METHOD_MAX);
+
+ processor = pa_xnew0(pa_processor, 1);
+ processor->intf = &method_table[method];
+ processor->core = core;
+ processor->process_usec = process_msec * PA_USEC_PER_MSEC;
+ processor->output_ss = output_ss;
+ processor->output_chmap = output_map;
+ processor->source_ss = source_ss;
+ processor->method = method;
+ processor->process_frames = pa_processor_usec_to_frame(processor->process_usec, processor->output_ss);
+ processor->process_bytes = pa_usec_to_bytes(processor->process_usec, processor->output_ss);
+
+ if (!(processor->priv = processor->intf->create(processor->process_frames, output_ss))) {
+ pa_log_error("Failed to create processor. rate(%d), channels(%d).", output_ss->rate, output_ss->channels);
+ pa_xfree(processor);
return NULL;
}
- processor = pa_xnew0(pa_processor, 1);
- processor->intf = pa_xnew0(pa_processor_algo, 1);
- processor->nframes = nframes;
- processor->framesize = pa_frame_size(ss);
-
- switch (backend) {
- case PA_PROCESSOR_SPEEX:
- processor->intf->create = speex_create;
- processor->intf->process = speex_process;
- processor->intf->destroy = speex_destroy;
- break;
- case PA_PROCESSOR_ADRIAN:
- processor->intf->create = adrian_create;
- processor->intf->process = adrian_process;
- processor->intf->destroy = adrian_destroy;
- break;
- case PA_PROCESSOR_WEBRTC:
- processor->intf->create = webrtc_audio_create;
- processor->intf->process = webrtc_audio_process;
- processor->intf->destroy = webrtc_audio_destroy;
- break;
- default:
- pa_log_error("Invalid backend(%d)", backend);
- goto fail;
+ pa_silence_memchunk_get(&core->silence_cache, core->mempool, &silence, output_ss, 0);
+ processor->output_memblockq = pa_memblockq_new("source-output memblockq",
+ 0,
+ MEMBLOCKQ_MAXLENGTH,
+ 0,
+ processor->output_ss,
+ 0,
+ pa_usec_to_bytes(processor->process_usec, output_ss),
+ 0,
+ &silence);
+ pa_memblock_unref(silence.memblock);
+
+ pa_log_info("Created processor. memblockq rate(%d), channels(%d), process_msec(%u), "
+ "process_bytes(%zu), method(%s), source rate(%d), channels(%d)",
+ output_ss->rate,
+ output_ss->channels,
+ process_msec,
+ pa_usec_to_bytes(processor->process_usec, output_ss),
+ method_table[method].name,
+ source_ss->rate,
+ source_ss->channels);
+
+ debug_open_file(processor);
+
+ return processor;
+}
+
+int pa_processor_bind_reference(pa_processor *processor,
+ uint32_t process_msec,
+ pa_sample_spec *reference_ss,
+ pa_channel_map *reference_chmap) {
+
+ pa_sample_spec sample_spec;
+ pa_channel_map channel_map;
+ pa_memchunk silence;
+
+ pa_assert(processor);
+ pa_assert(processor->intf);
+ pa_assert(processor->output_ss);
+ pa_assert(processor->output_chmap);
+ pa_assert(reference_ss);
+ pa_assert(reference_chmap);
+
+ processor->reference_process_usec = process_msec * PA_USEC_PER_MSEC;
+ if (processor->reference_process_usec < processor->process_usec) {
+ pa_log_error("Failed to bind reference. ref_usec(%" PRId64 "), process_usec(%" PRId64 ")",
+ processor->reference_process_usec, processor->process_usec);
+ return -1;
}
- pa_log_info("Use backend(%d) nframes(%zu) framesize(%d)",
- backend, processor->nframes, processor->framesize);
+ sample_spec = *processor->output_ss;
+ channel_map = *processor->output_chmap;
- if (!(processor->priv = processor->intf->create(nframes, ss))) {
- pa_log_error("Failed to create processor");
- goto fail;
+ /* select reference memblockq sample_spec and channelmap */
+ if (processor->intf->change_reference_spec) {
+ if (processor->intf->change_reference_spec(processor->priv, processor->source_ss, &sample_spec, &channel_map) < 0) {
+ pa_log_error("Failed to get reference info");
+ return -1;
+ }
}
-#ifdef __DEBUG__
- {
- static int n = 1;
- char rec[32], ref[32], out[32];
+ /* Create resampler */
+ if (!pa_sample_spec_equal(reference_ss, &sample_spec)) {
+ if (processor->resampler)
+ pa_resampler_free(processor->resampler);
+
+ processor->resampler = pa_resampler_new(processor->core->mempool,
+ reference_ss, reference_chmap,
+ &sample_spec, &channel_map,
+ processor->core->lfe_crossover_freq,
+ processor->core->resample_method, 0);
+ if (!processor->resampler) {
+ pa_log_error("Failed to allocate reference resampler");
+ return -1;
+ }
+ }
- snprintf(rec, sizeof(rec), "/tmp/rec-%d.raw", n);
- snprintf(ref, sizeof(ref), "/tmp/ref-%d.raw", n);
- snprintf(out, sizeof(out), "/tmp/out-%d.raw", n);
- n += 1;
+ processor->reference_memblockq_ss = sample_spec;
+ processor->reference_process_bytes = pa_usec_to_bytes(processor->reference_process_usec, &processor->reference_memblockq_ss);
- unlink(rec);
- unlink(ref);
- unlink(out);
+ /* Create memblockq */
+ pa_silence_memchunk_get(&processor->core->silence_cache, processor->core->mempool, &silence, &sample_spec, 0);
- processor->fdrec = open(rec, O_RDWR | O_CREAT | O_TRUNC, 777);
- processor->fdref = open(ref, O_RDWR | O_CREAT | O_TRUNC, 777);
- processor->fdout = open(out, O_RDWR | O_CREAT | O_TRUNC, 777);
- }
-#endif
+ if (processor->reference_memblockq)
+ pa_memblockq_free(processor->reference_memblockq);
- return processor;
+ processor->reference_memblockq = pa_memblockq_new("reference memblockq",
+ 0,
+ MEMBLOCKQ_MAXLENGTH,
+ 0,
+ &processor->reference_memblockq_ss,
+ 0,
+ processor->reference_process_bytes,
+ 0,
+ &silence);
+ pa_memblock_unref(silence.memblock);
-fail:
- pa_xfree(processor->intf);
- pa_xfree(processor);
+ pa_log_debug("Created reference memblockq rate(%d), channels(%d), msec(%u), bytes(%zu)",
+ sample_spec.rate,
+ sample_spec.channels,
+ process_msec,
+ processor->reference_process_bytes);
- return NULL;
+ return 0;
}
-int pa_processor_process(pa_processor *processor, int8_t *rec, int8_t *ref, int8_t *out) {
- int ret = -1;
+int pa_processor_setup_reference_memblockq_padding(pa_processor *processor, pa_usec_t latency) {
+ pa_memchunk silence;
+ int64_t write_index, read_index;
+ size_t n, bytes;
pa_assert(processor);
+
+ bytes = pa_usec_to_bytes(latency, &processor->reference_memblockq_ss);
+ n = (bytes + (processor->reference_process_bytes - 1)) / processor->reference_process_bytes;
+
+ pa_silence_memchunk_get(
+ &processor->core->silence_cache,
+ processor->core->mempool,
+ &silence,
+ &processor->reference_memblockq_ss,
+ processor->reference_process_bytes);
+
+ write_index = pa_memblockq_get_write_index(processor->reference_memblockq);
+ read_index = pa_memblockq_get_read_index(processor->reference_memblockq);
+
+ for (; n > 0; n--)
+ pa_memblockq_push(processor->reference_memblockq, &silence);
+
+ pa_memblock_unref(silence.memblock);
+
+ pa_log_info("push n(%u) silence blocks. ref_process_bytes(%zu) "
+ "write_index(%" PRId64 "->%" PRId64 "), read_index(%" PRId64 "->%" PRId64 ")",
+ pa_memblockq_get_nblocks(processor->reference_memblockq),
+ processor->reference_process_bytes,
+ write_index, pa_memblockq_get_write_index(processor->reference_memblockq),
+ read_index, pa_memblockq_get_read_index(processor->reference_memblockq));
+
+ return 0;
+}
+
+// TODO naming
+int pa_processor_process(pa_processor *processor, pa_memchunk *chunk, pa_memchunk *ochunk) {
+ int r = -1;
+ int8_t *recording = NULL;
+ int8_t *reference = NULL;
+ int8_t *output = NULL;
+ bool silence = false;
+
+ pa_memchunk ichunk, rchunk;
+
+ pa_assert(processor);
+ pa_assert(processor->output_memblockq);
+ pa_assert(processor->reference_memblockq);
pa_assert(processor->intf);
- pa_assert(rec);
- pa_assert(out);
+ pa_assert(processor->process_bytes > 0ULL);
+ pa_assert(processor->reference_process_bytes > 0ULL);
+ pa_assert(chunk);
+ pa_assert(ochunk);
+
+ if ((r = pa_memblockq_push(processor->output_memblockq, chunk)) < 0) {
+ pa_log_error("Failed to push chunk to echo memblockq");
+ return r;
+ }
-#ifdef __DEBUG__
- if (write(processor->fdrec, rec, processor->nframes * processor->framesize) <= 0)
- pa_log_error("Failed to write rec buffer");
+ if ((r = pa_memblockq_peek_fixed_size(processor->reference_memblockq, processor->reference_process_bytes, &rchunk)) < 0) {
+ pa_log_error("Failed to get memblock from reference memblockq");
+ return r;
+ }
+ silence = pa_memblock_is_silence(rchunk.memblock);
- if (write(processor->fdref, ref, processor->nframes * processor->framesize) <= 0)
- pa_log_error("Failed to write ref buffer");
+ if ((r = pa_memblockq_peek_fixed_size(processor->output_memblockq, processor->process_bytes, &ichunk)) < 0) {
+ pa_log_error("Failed to get memblock from output memblockq");
+ return r;
+ }
- gettimeofday(&processor->before, NULL);
-#endif
+ ochunk->index = 0;
+ ochunk->length = ichunk.length;
+ ochunk->memblock = pa_memblock_new(processor->core->mempool, ochunk->length);
- if (processor->intf->process)
- ret = processor->intf->process(processor->priv, rec, ref, out);
+ recording = pa_memblock_acquire(ichunk.memblock);
+ reference = pa_memblock_acquire(rchunk.memblock);
+ output = pa_memblock_acquire(ochunk->memblock);
-#ifdef __DEBUG__
- if (write(processor->fdout, out, processor->nframes * processor->framesize) <= 0)
- pa_log_error("Failed to write out buffer");
+ debug_timestamp_begin(processor);
- gettimeofday(&processor->after, NULL);
+ r = processor->intf->process(processor->priv, recording, reference, output);
- pa_log_debug("It takes time(%ld) bytes(%d)",
- 1000 * (after.tv_sec-before.tv_sec) + (after.tv_usec-before.tv_usec) / 1000,
- processor->buffer_size);
-#endif
+ debug_timestamp_end(processor);
+ debug_write_file(processor, recording, reference, output);
+
+ pa_memblock_release(ichunk.memblock);
+ pa_memblock_release(rchunk.memblock);
+ pa_memblock_release(ochunk->memblock);
- return ret;
+ pa_memblock_unref(rchunk.memblock);
+ pa_memblockq_drop(processor->reference_memblockq, rchunk.length);
+
+ pa_memblock_unref(ichunk.memblock);
+ pa_memblockq_drop(processor->output_memblockq, ichunk.length);
+
+ pa_log_debug("Post-process. rec(%" PRIu64 "ms), ref(%" PRIu64 "msms) out(%" PRIu64 "ms), "
+ "silence(%d), windex:rindex(%" PRId64 ":%" PRId64 ")",
+ pa_bytes_to_usec(ichunk.length, processor->output_ss) / PA_USEC_PER_MSEC,
+ pa_bytes_to_usec(rchunk.length, &processor->reference_memblockq_ss) / PA_USEC_PER_MSEC,
+ pa_bytes_to_usec(ochunk->length, processor->output_ss) / PA_USEC_PER_MSEC,
+ silence,
+ pa_memblockq_get_write_index(processor->reference_memblockq),
+ pa_memblockq_get_read_index(processor->reference_memblockq));
+
+ return r;
+}
+
+int pa_processor_push_data(pa_processor *processor, pa_memchunk *chunk) {
+ pa_memchunk ochunk;
+ int r;
+
+ pa_assert(processor);
+ pa_assert(chunk);
+
+ if (processor->resampler) {
+ pa_resampler_run(processor->resampler, chunk, &ochunk);
+ chunk = &ochunk;
+ }
+
+ if ((r = pa_memblockq_push(processor->reference_memblockq, chunk)) < 0)
+ pa_log_error("Failed to push chunk to echo memblockq");
+
+ if (processor->resampler)
+ pa_memblock_unref(chunk->memblock);
+
+ pa_log_debug("Pushed echo data. bytes(%zu), msec(%" PRIu64 "ms), nblocks(%d) index(%" PRId64 ":%" PRId64 ")",
+ chunk->length,
+ pa_bytes_to_usec(chunk->length, &processor->reference_memblockq_ss) / PA_USEC_PER_MSEC,
+ pa_memblockq_get_nblocks(processor->reference_memblockq),
+ pa_memblockq_get_write_index(processor->reference_memblockq),
+ pa_memblockq_get_read_index(processor->reference_memblockq));
+
+ return r;
+}
+
+void pa_processor_flush(pa_processor *processor) {
+ pa_assert(processor);
+
+ if (processor->reference_memblockq)
+ pa_memblockq_flush_read(processor->reference_memblockq);
+
+ if (processor->output_memblockq)
+ pa_memblockq_flush_read(processor->output_memblockq);
}
int pa_processor_free(pa_processor *processor) {
pa_assert(processor);
+ pa_assert(processor->priv);
pa_assert(processor->intf);
- pa_assert(processor->intf->destroy);
-
-#ifdef __DEBUG__
- if (processor->fdrec)
- close(processor->fdrec);
- if (processor->fdref)
- close(processor->fdref);
- if (processor->fdout)
- close(processor->fdout);
-#endif
- if (processor->intf->destroy(processor->priv)) {
+ if (processor->intf->destroy(processor->priv) < 0)
pa_log_error("Failed to destroy processor");
- return -1;
- }
- pa_xfree(processor->intf);
+ if (processor->resampler)
+ pa_resampler_free(processor->resampler);
+
+ if (processor->reference_memblockq)
+ pa_memblockq_free(processor->reference_memblockq);
+
+ if (processor->output_memblockq)
+ pa_memblockq_free(processor->output_memblockq);
+
+ debug_close_file(processor);
+
pa_xfree(processor);
return 0;
}
-size_t pa_processor_get_blocksize(pa_processor *processor) {
- pa_assert(processor);
+pa_processor_method_t pa_processor_get_method(const char *request_method, const char *default_method) {
+ const char *selected;
+
+ if (!request_method || !default_method)
+ return PA_PROCESSOR_SPEEX;
+
+ selected = pa_streq(request_method, "default") ? default_method : request_method;
- return processor->nframes;
+ if (pa_streq(selected, "speex"))
+ return PA_PROCESSOR_SPEEX;
+ else if (pa_streq(selected, "adrian"))
+ return PA_PROCESSOR_ADRIAN;
+#ifdef SUPPORT_METHOD_WEBRTC
+ else if (pa_streq(selected, "webrtc"))
+ return PA_PROCESSOR_WEBRTC;
+#endif
+ else if (pa_streq(selected, "reference_copy"))
+ return PA_PROCESSOR_REFERENCE_COPY;
+ else
+ return PA_PROCESSOR_SPEEX;
+}
+
+const char *pa_processor_method_to_string(pa_processor_method_t method) {
+ if (method >= PA_PROCESSOR_METHOD_MAX)
+ return NULL;
+
+ return method_table[method].name;
+}
+
+#ifdef __DEBUG__
+static void debug_open_file(pa_processor *processor) {
+ static int n = 1;
+ char rec[32], ref[32], out[32];
+
+ snprintf(rec, sizeof(rec), "/tmp/rec-%d.raw", n);
+ snprintf(ref, sizeof(ref), "/tmp/ref-%d.raw", n);
+ snprintf(out, sizeof(out), "/tmp/out-%d.raw", n);
+ n += 1;
+
+ unlink(rec);
+ unlink(ref);
+ unlink(out);
+
+ processor->fdrec = open(rec, O_RDWR | O_CREAT | O_TRUNC, 777);
+ processor->fdref = open(ref, O_RDWR | O_CREAT | O_TRUNC, 777);
+ processor->fdout = open(out, O_RDWR | O_CREAT | O_TRUNC, 777);
+}
+
+static void debug_timestamp_begin(pa_processor *processor) {
+ gettimeofday(&processor->before, NULL);
+}
+
+static void debug_timestamp_end(pa_processor *processor) {
+ gettimeofday(&processor->after, NULL);
+
+ pa_log_debug("It takes time (%ld)ms.",
+ 1000 * (processor->after.tv_sec - processor->before.tv_sec)
+ + (processor->after.tv_usec - processor->before.tv_usec) / 1000);
+}
+
+static void debug_write_file(pa_processor *processor, int8_t *rec, int8_t *ref, int8_t *out) {
+ if (rec && write(processor->fdrec, rec, processor->process_bytes) <= 0)
+ pa_log_error("Failed to write recording buffer");
+
+ if (ref && write(processor->fdref, ref, processor->reference_process_bytes) <= 0)
+ pa_log_error("Failed to write reference buffer");
+
+ if (out && write(processor->fdout, out, processor->process_bytes) <= 0)
+ pa_log_error("Failed to write ref buffer");
+}
+
+static void debug_close_file(pa_processor *processor) {
+ if (processor->fdrec) {
+ close(processor->fdrec);
+ processor->fdrec = -1;
+ }
+
+ if (processor->fdref) {
+ close(processor->fdref);
+ processor->fdref = -1;
+ }
+
+ if (processor->fdout) {
+ close(processor->fdout);
+ processor->fdout = -1;
+ }
}
+#endif
+
-#ifndef foopulseprocessorfoo
-#define foopulseprocessorfoo
-
/***
This file is part of PulseAudio.
USA.
***/
+#ifndef foopulseprocessorfoo
+#define foopulseprocessorfoo
+
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
+#include <pulsecore/core.h>
+#include <pulsecore/memblock.h>
+#include <pulse/channelmap.h>
#include <pulse/sample.h>
-//#define __DEBUG__
-
-typedef enum {
- PA_PROCESSOR_FLAGS_ECHO_CANCEL,
- PA_PROCESSOR_FLAGS_NOISE_SUPPRESSION,
-} pa_process_flags_t;
-
typedef enum {
PA_PROCESSOR_SPEEX,
PA_PROCESSOR_ADRIAN,
+#ifdef SUPPORT_METHOD_WEBRTC
PA_PROCESSOR_WEBRTC,
-} pa_processor_algo_t;
+#endif
+ PA_PROCESSOR_REFERENCE_COPY,
+ PA_PROCESSOR_METHOD_MAX,
+} pa_processor_method_t;
-typedef struct pa_processor_algo pa_processor_algo;
typedef struct pa_processor pa_processor;
-struct pa_processor {
- pa_processor_algo *intf;
- void *priv;
- void *userdata;
- size_t nframes;
- size_t framesize;
-
-#ifdef __DEBUG__
- int fdrec, fdref, fdout;
- struct timeval before, after;
-#endif
-};
-
-pa_processor *pa_processor_new(size_t nframes, pa_sample_spec *ss, pa_processor_algo_t backend, pa_process_flags_t flags);
-int pa_processor_process(pa_processor *processor, int8_t *rec, int8_t *ref, int8_t *out);
+pa_processor *pa_processor_new(pa_core *core,
+ uint32_t process_msec,
+ pa_sample_spec *output_ss,
+ pa_channel_map *output_map,
+ pa_sample_spec *source_ss,
+ pa_processor_method_t method);
+int pa_processor_bind_reference(pa_processor *processor,
+ uint32_t process_msec,
+ pa_sample_spec *reference_ss,
+ pa_channel_map *reference_chmap);
+int pa_processor_setup_reference_memblockq_padding(pa_processor *processor, pa_usec_t latency);
+int pa_processor_process(pa_processor *processor, pa_memchunk *rec, pa_memchunk *out);
+int pa_processor_push_data(pa_processor *processor, pa_memchunk *chunk);
+void pa_processor_flush(pa_processor *processor);
int pa_processor_free(pa_processor *processor);
-size_t pa_processor_get_blocksize(pa_processor *processor);
+pa_processor_method_t pa_processor_get_method(const char *requset_method, const char *default_method);
+const char *pa_processor_method_to_string(pa_processor_method_t method);
#endif
+
return 0;
}
- case PA_SOURCE_MESSAGE_REMOVE_OUTPUT: {
- pa_source_output *o = (pa_source_output *)data;
-
- if (!(o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL))
- break;
-
- if (u->ec_asyncmsgq) {
- pa_asyncmsgq_send(u->ec_asyncmsgq, u->ec_object,
- PA_ECHO_CANCEL_MESSAGE_SOURCE_OUTPUT_UNLINK, o, 0, NULL);
- }
-
- return 0;
- }
case PA_SOURCE_MESSAGE_REBUILD_RTPOLL: {
struct arguments {
pa_msgobject *o;