From: Jaechul Lee Date: Tue, 29 Mar 2022 08:06:43 +0000 (+0900) Subject: tizenaudio-echo-cancel: Support reference raw copy functionality X-Git-Tag: submit/tizen/20220421.081718^0 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=edbaef30fbd6be4f8dcc1d67c937e138ac1da07b;p=platform%2Fcore%2Fmultimedia%2Fpulseaudio-modules-tizen.git tizenaudio-echo-cancel: Support reference raw copy functionality processing echo-cancellation in module-tizenaudio-echo-cancel moved to processor modules for supporting reference copy. * created reference copy * removed delayq [Version] 15.0.10 [Issue Type] New Feature Change-Id: Id4529e7f6225b1c3f1d6e499bc915aaef1a940c5 Signed-off-by: Jaechul Lee --- diff --git a/Makefile.am b/Makefile.am index c41e653..e61e057 100644 --- a/Makefile.am +++ b/Makefile.am @@ -90,6 +90,7 @@ module_tizenaudio_source2_la_CFLAGS = $(MODULE_CFLAGS) -DPA_MODULE_NAME=module_t libprocessor_la_SOURCES = \ src/echo-cancel/algo_speex.c \ + src/echo-cancel/algo_reference_copy.c \ src/echo-cancel/algo_adrian.c \ src/echo-cancel/adrian-aec.c \ src/echo-cancel/processor.c \ @@ -102,7 +103,7 @@ libprocessor_la_CFLAGS = $(AM_CFLAGS) $(PA_CFLAGS) $(LIBSPEEX_CFLAGS) if ENABLE_WEBRTC libprocessor_la_SOURCES += src/echo-cancel/algo_webrtc.cpp libprocessor_la_LIBADD += $(WEBRTC_LIBS) -libprocessor_la_CPPFLAGS = $(WEBRTC_CFLAGS) $(PA_CFLAGS) -std=c++17 +libprocessor_la_CPPFLAGS = $(WEBRTC_CFLAGS) $(PA_CFLAGS) -DSUPPORT_METHOD_WEBRTC -std=c++17 endif module_tizenaudio_echo_cancel_la_SOURCES = src/echo-cancel/module-tizenaudio-echo-cancel.c src/echo-cancel/echo-cancel-def.h diff --git a/packaging/pulseaudio-modules-tizen.spec b/packaging/pulseaudio-modules-tizen.spec index 38eda7d..2611033 100644 --- a/packaging/pulseaudio-modules-tizen.spec +++ b/packaging/pulseaudio-modules-tizen.spec @@ -2,7 +2,7 @@ Name: pulseaudio-modules-tizen Summary: Pulseaudio modules for Tizen -Version: 15.0.9 +Version: 15.0.10 Release: 0 Group: Multimedia/Audio License: LGPL-2.1+ diff --git a/src/echo-cancel/algo_reference_copy.c b/src/echo-cancel/algo_reference_copy.c new file mode 100644 index 0000000..2d9cf0b --- /dev/null +++ b/src/echo-cancel/algo_reference_copy.c @@ -0,0 +1,106 @@ +/*** + This file is part of PulseAudio. + + Copyright 2022 Jaechul Lee + + PulseAudio is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + PulseAudio is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + USA. +***/ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include +#include +#include +#include +#include + +struct reference_copy { + size_t nframes; + pa_sample_spec ss; + int reference_channels; +}; + +void *reference_copy_create(size_t nframes, pa_sample_spec *ss) { + struct reference_copy *rc; + + pa_assert(ss); + + rc = pa_xnew0(struct reference_copy, 1); + rc->nframes = nframes; + rc->ss = *ss; + + return rc; +} + +int32_t reference_copy_process(void *priv, int8_t *rec, int8_t *ref, int8_t *out) { + struct reference_copy *rc = priv; + size_t rec_bytes, ref_bytes, actual_bytes, total_bytes; + int8_t *dst = out; + + pa_assert(rc); + pa_assert(rec); + pa_assert(ref); + pa_assert(out); + + rec_bytes = pa_frame_size(&rc->ss); + ref_bytes = rc->reference_channels * pa_sample_size(&rc->ss); + actual_bytes = rec_bytes - ref_bytes; + total_bytes = rec_bytes * rc->nframes; + + while ((dst - out) < total_bytes) { + memcpy(dst, rec, actual_bytes); + memcpy(dst + actual_bytes, ref, ref_bytes); + dst += rec_bytes; + rec += rec_bytes; + ref += ref_bytes; + } + + return 0; +} + +int32_t reference_copy_destroy(void *priv) { + struct reference_copy *rc = priv; + + pa_assert(rc); + + pa_xfree(rc); + + return 0; +} + +int32_t reference_copy_change_reference_spec(void *priv, pa_sample_spec *source_ss, pa_sample_spec *sample_spec, pa_channel_map *map) { + struct reference_copy *rc = priv; + int channels; + + pa_assert(rc); + pa_assert(source_ss); + pa_assert(sample_spec); + pa_assert(map); + + channels = rc->ss.channels - source_ss->channels; + if (channels <= 0) + return -1; + + *sample_spec = rc->ss; + sample_spec->channels = rc->reference_channels = channels; + + pa_channel_map_init_auto(map, channels, PA_CHANNEL_MAP_AIFF); + + return 0; +} diff --git a/src/echo-cancel/module-tizenaudio-echo-cancel.c b/src/echo-cancel/module-tizenaudio-echo-cancel.c index 2f95314..8aa456c 100644 --- a/src/echo-cancel/module-tizenaudio-echo-cancel.c +++ b/src/echo-cancel/module-tizenaudio-echo-cancel.c @@ -46,8 +46,9 @@ PA_MODULE_DESCRIPTION("Tizen Audio Echo Cancel"); PA_MODULE_VERSION(PACKAGE_VERSION); PA_MODULE_LOAD_ONCE(true); PA_MODULE_USAGE( - "method= " - "blocksize= "); + "method= "); + +#define DEFAULT_PROCESS_MSEC 10 typedef struct echo_cancel pa_echo_cancel; struct userdata { @@ -67,7 +68,7 @@ struct userdata { bool enable; uint32_t n_source_output; - size_t blocksize; + char *default_method; pa_thread *thread; @@ -76,8 +77,8 @@ struct userdata { pa_echo_cancel *echo_cancel; /* use in thread */ - pa_memblockq *delayq; bool enable_in_thread; + bool triggered; pa_asyncmsgq *asyncmsgq_sink; pa_asyncmsgq *asyncmsgq_source; @@ -92,51 +93,42 @@ PA_DEFINE_PRIVATE_CLASS(pa_echo_cancel, pa_msgobject); #define PA_ECHO_CANCEL(o) (pa_echo_cancel_cast(o)) #define MEMBLOCKQ_MAXLENGTH (16 * 1024 * 1024) -#define CHECK_FLAGS_AEC(x) (x & PA_SOURCE_OUTPUT_ECHO_CANCEL) -#define CHECK_COUNT_SOURCE_OUTPUT_AEC(x) (x->n_source_output) #define DEFAULT_AEC_METHOD "speex" static const char* const valid_modargs[] = { "method", - "blocksize", NULL, }; -static int proplist_get_fragment_size(pa_proplist *p, size_t *size) { - const char *fragsize; - uint32_t blocksize; +static int proplist_get_fragment_size_usec(pa_proplist *p, pa_sample_spec *sample_spec, pa_usec_t *usec) { + const char *prop_fragsize; + uint32_t fragsize; + + pa_assert(p); + pa_assert(sample_spec); + pa_assert(usec); - if (!(fragsize = pa_proplist_gets(p, PA_PROP_DEVICE_BUFFERING_FRAGMENT_SIZE))) + if (!(prop_fragsize = pa_proplist_gets(p, PA_PROP_DEVICE_BUFFERING_FRAGMENT_SIZE))) return -1; - if (pa_atou(fragsize, &blocksize)) + if (pa_atou(prop_fragsize, &fragsize)) return -1; - *size = blocksize; + *usec = pa_bytes_to_usec(fragsize, sample_spec); return 0; } -static int pa_processor_get_method(pa_source_output *o, const char *default_method, pa_processor_algo_t *method) { - const char *selected, *requested; +static int proplist_get_method(pa_proplist *p, const char *default_method, pa_processor_method_t *method) { + const char *m; + pa_assert(p); pa_assert(method); - pa_assert(default_method); - requested = pa_proplist_gets(o->proplist, PA_PROP_MEDIA_ECHO_CANCEL_METHOD); - if (!requested) + if (!(m = pa_proplist_gets(p, PA_PROP_MEDIA_ECHO_CANCEL_METHOD))) return -1; - selected = pa_streq(requested, "default") ? default_method : requested; - - if (pa_streq(selected, "webrtc")) - *method = PA_PROCESSOR_WEBRTC; - else if (pa_streq(selected, "speex")) - *method = PA_PROCESSOR_SPEEX; - else if (pa_streq(selected, "adrian")) - *method = PA_PROCESSOR_ADRIAN; - else - *method = PA_PROCESSOR_SPEEX; + *method = pa_processor_get_method(m, default_method); return 0; } @@ -150,38 +142,13 @@ static pa_source_output *find_source_output_by_flags(pa_source *s) { while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) { pa_source_output_assert_ref(o); - if (CHECK_FLAGS_AEC(o->flags)) + if (o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL) break; } return o ? o : NULL; } -static void free_source_output_extra_resource(pa_source_output *o) { - pa_assert(o); - - if (o->thread_info.processor) { - pa_processor_free(o->thread_info.processor); - o->thread_info.processor = NULL; - } - - if (o->thread_info.resampler2) { - pa_resampler_free(o->thread_info.resampler2); - o->thread_info.resampler2 = NULL; - } - - if (o->thread_info.echo) { - pa_memblockq_free(o->thread_info.echo); - o->thread_info.echo = NULL; - } -} - -static void free_source_output_extra_resource_by_source(pa_source *s) { - pa_assert(s); - - free_source_output_extra_resource(find_source_output_by_flags(s)); -} - static pa_usec_t get_round_trip_latency(struct userdata *u) { pa_usec_t sink_latency; pa_usec_t source_latency; @@ -189,65 +156,14 @@ static pa_usec_t get_round_trip_latency(struct userdata *u) { pa_assert(u); pa_assert(u->sink); - sink_latency = pa_sink_get_latency(u->sink); - source_latency = pa_source_get_latency(u->source); + pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_GET_LATENCY, &sink_latency, 0, NULL); + pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), PA_SOURCE_MESSAGE_GET_LATENCY, &source_latency, 0, NULL); - pa_log_info("sink latency (%llu), source latency(%llu)", sink_latency, source_latency); + pa_log_info("sink latency (%" PRIu64 "), source latency(%" PRIu64 ")", sink_latency, source_latency); return sink_latency + source_latency; } -static int setup_delayq_latency(struct userdata *u, pa_usec_t latency) { - int64_t write_index, read_index; - size_t bytes, blocksize, n; - pa_memchunk silence; - - if (proplist_get_fragment_size(u->sink->proplist, &blocksize)) - return -1; - - if (u->delayq) - pa_memblockq_free(u->delayq); - - u->delayq = pa_memblockq_new("echo reference delay", - 0, - MEMBLOCKQ_MAXLENGTH, - 0, - &u->sink->sample_spec, - 0, - blocksize, - 0, - NULL); - - bytes = pa_usec_to_bytes(latency, &u->sink->sample_spec); - n = (bytes + blocksize - 1) / blocksize; - - pa_silence_memchunk_get( - &u->sink->core->silence_cache, - u->sink->core->mempool, - &silence, - &u->sink->sample_spec, - blocksize); - - if (!silence.memblock) - return -1; - - write_index = pa_memblockq_get_write_index(u->delayq); - read_index = pa_memblockq_get_read_index(u->delayq); - - pa_memblockq_flush_write(u->delayq, true); - while (n-- > 0) - pa_memblockq_push(u->delayq, &silence); - - pa_log_info("push n(%d) blocks. write_index(%llu->%llu), read_index(%llu->%llu)", - pa_memblockq_get_nblocks(u->delayq), - write_index, pa_memblockq_get_write_index(u->delayq), - read_index, pa_memblockq_get_read_index(u->delayq)); - - pa_memblock_unref(silence.memblock); - - return 0; -} - static int send_rebuild_rtpoll(pa_msgobject *dst, pa_msgobject *src, pa_asyncmsgq *q) { struct arguments { pa_msgobject *o; @@ -277,48 +193,65 @@ static int send_rebuild_rtpoll(pa_msgobject *dst, pa_msgobject *src, pa_asyncmsg } /* Call from main thread */ -static void set_aec_state(struct userdata *u, bool enable) { +static void broadcast_echo_cancel_state(struct userdata *u, pa_source_output *o, bool enable) { void *v[2]; - pa_usec_t latency = 0ULL; pa_assert(u); pa_assert(u->source); - pa_assert(u->sink); /* not allow sink null */ - - pa_log_info("set_aec state %d -> %d", u->enable, enable); - - if (u->enable == enable) - return; + pa_assert(u->sink); - latency = enable ? get_round_trip_latency(u) : 0ULL; + if (enable) { + send_rebuild_rtpoll(PA_MSGOBJECT(u->source), PA_MSGOBJECT(u->echo_cancel), u->asyncmsgq_source); + send_rebuild_rtpoll(PA_MSGOBJECT(u->sink), PA_MSGOBJECT(u->echo_cancel), u->asyncmsgq_sink); + } v[0] = (void *)enable; - v[1] = &latency; + v[1] = o; - /* There is a race condition between the source thread and the render thread. - * pa_source_post function can be overlapped at the same time */ pa_asyncmsgq_send(u->thread_mq.inq, PA_MSGOBJECT(u->echo_cancel), PA_ECHO_CANCEL_MESSAGE_SET_AEC_STATE, (void *)v, 0, NULL); - if (u->sink) - pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), - PA_SINK_MESSAGE_SET_AEC_STATE, (void *)enable, 0, NULL, NULL); + pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), + PA_SINK_MESSAGE_SET_AEC_STATE, (void *)enable, 0, NULL, NULL); pa_asyncmsgq_post(u->source->asyncmsgq, PA_MSGOBJECT(u->source), PA_SOURCE_MESSAGE_SET_AEC_STATE, (void *)enable, 0, NULL, NULL); + if (!enable) { + send_rebuild_rtpoll(PA_MSGOBJECT(u->source), NULL, NULL); + send_rebuild_rtpoll(PA_MSGOBJECT(u->sink), NULL, NULL); + } +} + +static void set_echo_cancel_state(struct userdata *u, bool enable) { + pa_source_output *o; + + pa_assert(u); + pa_assert(u->source); + pa_assert(u->sink); + + if (u->enable == enable) + return; + + o = find_source_output_by_flags(u->source); + if (!o) { + pa_log_error("Failed to find EC source-output"); + return; + } + + broadcast_echo_cancel_state(u, o, enable); u->enable = enable; - pa_log_info("AEC state updated. enable(%d)", u->enable); + pa_log_info("AEC state is changed. enable(%d)", u->enable); } static int update_state_by_sink(struct userdata *u, bool enable) { pa_assert(u); - if (CHECK_COUNT_SOURCE_OUTPUT_AEC(u) == 0) + if (u->n_source_output == 0) return 0; - set_aec_state(u, enable); + set_echo_cancel_state(u, enable); return 0; } @@ -329,37 +262,18 @@ static int update_state_by_source(struct userdata *u, bool enable) { if (enable) { if (u->n_source_output++ == 0) { if (!u->sink || PA_SINK_IS_RUNNING(u->sink->state)) - set_aec_state(u, enable); + set_echo_cancel_state(u, enable); } } else { if (--u->n_source_output == 0) - set_aec_state(u, enable); + set_echo_cancel_state(u, enable); } return 0; } -static int unlink_source_output_in_thread(pa_source_output *o) { - pa_assert(o); - - free_source_output_extra_resource(o); - - /* source-output should be remove by render thread to prevent race condition. - * 1. invoke REMOVE message - * 2. receive the message in tizenaudio-source - * 3. send the message to tizenaudio-echo-cancel - * 4. remove source-output in render thread - */ - pa_source_process_msg(PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, 0, NULL); - - return 0; -} - static void pa_source_push_echo(pa_source *s, pa_memchunk *chunk) { pa_source_output *o = NULL; - pa_memchunk ochunk; - bool nf = false; - int r; o = find_source_output_by_flags(s); if (!o) { @@ -367,129 +281,19 @@ static void pa_source_push_echo(pa_source *s, pa_memchunk *chunk) { return; } - if (o->thread_info.resampler2) { - pa_resampler_run(o->thread_info.resampler2, chunk, &ochunk); - chunk = &ochunk; - nf = true; - } - - r = pa_memblockq_push(o->thread_info.echo, chunk); - if (r != 0) - pa_log_error("Failed to push chunk to memblockq"); - - if (nf) - pa_memblock_unref(chunk->memblock); - - pa_log_debug("Pushed echo data. index(%u) size(%llums), nblocks(%d) index(%lld:%lld)", - o->index, - pa_bytes_to_usec(chunk->length, &o->sample_spec) / PA_USEC_PER_MSEC, - pa_memblockq_get_nblocks(o->thread_info.echo), - pa_memblockq_get_write_index(o->thread_info.echo), - pa_memblockq_get_read_index(o->thread_info.echo)); -} - -static void flush_echo_memblockq(pa_source *s) { - pa_source_output *o; - - o = find_source_output_by_flags(s); - if (!o) { - pa_log_error("Can't find aec source-output"); - return; - } - - pa_memblockq_flush_write(o->thread_info.echo, true); + if (pa_processor_push_data(o->thread_info.processor, chunk) < 0) + pa_log_error("Failed to push reference data"); } static int post_process(pa_source_output *o, pa_memchunk *chunk, pa_memchunk *ochunk) { int ret = -1; - int8_t *rec, *ref, *out; - size_t blocksize; - - pa_memchunk tchunk; - - if (!o->thread_info.processor) { - pa_log_error("Failed to get processor"); - return ret; - } - - /* - * pre-condition - * sink block >= source block >= blocksize * n - * if blocksize is not described blocksize, It should be same as source's fragment size - * - * chunk must be processed every data. - */ - /* reference exist */ - if (o->thread_info.echo) { - size_t block_bytes, length, n; - - /* echo queue is not ready that means reference is not started */ - if (pa_memblockq_is_empty(o->thread_info.echo)) - return ret; - - blocksize = pa_processor_get_blocksize(o->thread_info.processor); - block_bytes = blocksize * pa_frame_size(&o->sample_spec); - - if (chunk->length % block_bytes) { - pa_log_warn("Skip to process aec. chunk size must be multiple of blocksize"); - return -1; - } - - n = chunk->length / block_bytes; - length = n * block_bytes; - - if (!(ret = pa_memblockq_peek_fixed_size(o->thread_info.echo, length, &tchunk))) { - int i; - - ochunk->index = 0; - ochunk->length = length; - ochunk->memblock = pa_memblock_new(o->core->mempool, length); - - rec = pa_memblock_acquire(chunk->memblock); - ref = pa_memblock_acquire(tchunk.memblock); - out = pa_memblock_acquire(ochunk->memblock); /* TODO: buffer can be shared rec buffer */ - - for (i=0; ithread_info.processor, - rec + (i * block_bytes), - ref + (i * block_bytes), - out + (i * block_bytes)); - - pa_memblock_release(chunk->memblock); - pa_memblock_release(tchunk.memblock); - pa_memblock_release(ochunk->memblock); - - pa_log_debug("Post-process. i(%u), rec(%llums), ref(%llums) " - "block(%llums), process(%llums) * n(%d) " - "silence(%d), index(%lld:%lld)", - o->index, - pa_bytes_to_usec(chunk->length, &o->sample_spec) / PA_USEC_PER_MSEC, - pa_bytes_to_usec(tchunk.length, &o->sample_spec) / PA_USEC_PER_MSEC, - pa_bytes_to_usec(length, &o->sample_spec) / PA_USEC_PER_MSEC, - pa_bytes_to_usec(block_bytes, &o->sample_spec) / PA_USEC_PER_MSEC, - n, - pa_memblock_is_silence(tchunk.memblock), - pa_memblockq_get_write_index(o->thread_info.echo), - pa_memblockq_get_read_index(o->thread_info.echo)); - - pa_memblock_unref(tchunk.memblock); - pa_memblockq_drop(o->thread_info.echo, tchunk.length); - } - } else { - /* no reference case like audio_share */ - rec = pa_memblock_acquire(chunk->memblock); - - ochunk->index = 0; - ochunk->length = chunk->length; - ochunk->memblock = pa_memblock_new(o->core->mempool, chunk->length); - out = pa_memblock_acquire(ochunk->memblock); - - pa_processor_process(o->thread_info.processor, rec, NULL, out); + pa_assert(o); + pa_assert(chunk); + pa_assert(ochunk); - pa_memblock_release(chunk->memblock); - pa_memblock_release(ochunk->memblock); - } + if ((ret = pa_processor_process(o->thread_info.processor, chunk, ochunk)) < 0) + pa_log_error("Failed to process data"); return ret; } @@ -504,86 +308,80 @@ static int process_msg( struct userdata *u = PA_ECHO_CANCEL(o)->u; - /* thread that pushes ref data should be called in render thread because of thread safe */ - switch (code) { - case PA_ECHO_CANCEL_MESSAGE_PUSH_DATA: { - /* a few pcm data will get lost. */ - if (!u->enable_in_thread) - return 0; + /* trigger resolves a race condition related to post_process between source and render thread */ + if (u->triggered) { + if (code == PA_ECHO_CANCEL_MESSAGE_PUSH_DATA || code == PA_ECHO_CANCEL_MESSAGE_PUSH_ECHO) { + pa_source_output *o = NULL; + pa_usec_t latency; - pa_source_post(u->source, chunk); + o = find_source_output_by_flags(u->source); + o->post_process = post_process; - return 0; - } - case PA_ECHO_CANCEL_MESSAGE_PUSH_ECHO: { - pa_memchunk ochunk; + latency = get_round_trip_latency(u); + + if (pa_processor_setup_reference_memblockq_padding(o->thread_info.processor, latency) < 0) + pa_log_warn("Failed to setup reference memblockq padding"); - if (!u->enable_in_thread) - return 0; + u->triggered = false; - pa_memblockq_push(u->delayq, chunk); + pa_log_info("Triggered Echo-Cancellation. index(%d), latency(%" PRIu64 ") usec", o->index, latency); + } + } - if (!pa_memblockq_peek(u->delayq, &ochunk)) { - pa_source_push_echo(u->source, &ochunk); + /* thread that pushes ref data should be called in render thread because of thread safe */ + switch (code) { + case PA_ECHO_CANCEL_MESSAGE_PUSH_DATA: + if (u->enable_in_thread) + pa_source_post(u->source, chunk); - pa_memblock_unref(ochunk.memblock); - pa_memblockq_drop(u->delayq, ochunk.length); - } + break; + case PA_ECHO_CANCEL_MESSAGE_PUSH_ECHO: + if (u->enable_in_thread) + pa_source_push_echo(u->source, chunk); - return 0; - } - case PA_ECHO_CANCEL_MESSAGE_SET_AEC_STATE : { + break; + case PA_ECHO_CANCEL_MESSAGE_SET_AEC_STATE: { void **v = (void **)data; - pa_usec_t latency; + bool enable = (bool)v[0]; + pa_source_output *o = (pa_source_output *)v[1]; - u->enable_in_thread = !!(int)v[0]; - latency = *(pa_usec_t *)v[1]; + u->enable_in_thread = enable; - if (u->enable_in_thread) { - if (setup_delayq_latency(u, latency)) { - pa_log_error("Failed to init delayq"); - return 0; - } + if (enable) { + u->triggered = true; } else { - if (u->delayq) { - pa_memblockq_free(u->delayq); - u->delayq = NULL; - } - - flush_echo_memblockq(u->source); + pa_processor_flush(o->thread_info.processor); + o->post_process = NULL; } - pa_log_info("EC state change (%d)", u->enable_in_thread); - - return 0; + break; } - case PA_ECHO_CANCEL_MESSAGE_SOURCE_OUTPUT_UNLINK: - unlink_source_output_in_thread((pa_source_output *)data); - return 0; default: - return 0; + break; } + + return 0; } static pa_hook_result_t source_output_new_cb(pa_core *c, pa_source_output_new_data *data, void *userdata) { struct userdata *u = (struct userdata *)userdata; - const char *method; + pa_processor_method_t method; pa_assert(c); pa_assert(u); pa_assert(data); - method = pa_proplist_gets(data->proplist, PA_PROP_MEDIA_ECHO_CANCEL_METHOD); - if (!method) - return PA_HOOK_OK; - - if (CHECK_COUNT_SOURCE_OUTPUT_AEC(u) > 0) { - pa_log_error("Not allow multi aec instance"); + if (proplist_get_method(data->proplist, u->default_method, &method) < 0) return PA_HOOK_OK; - } - data->flags |= PA_SOURCE_OUTPUT_ECHO_CANCEL; + /* TODO: source-output can be moved */ data->flags |= PA_SOURCE_OUTPUT_DONT_MOVE; + data->flags |= PA_SOURCE_OUTPUT_ECHO_CANCEL; + + if (method == PA_PROCESSOR_REFERENCE_COPY) + data->flags |= PA_SOURCE_OUTPUT_NO_REMAP; + + pa_log_info("echo-cancel source-output will be created. method(%d)", method); // TODO:add check limitation VARIOUS_RATE? return PA_HOOK_OK; @@ -613,72 +411,31 @@ static pa_sink *find_reference_sink_by_proplist(pa_core *c, pa_proplist *p) { return s; } -static int check_latency_validation(struct userdata *u, pa_sink *sink, pa_source *source) { - pa_usec_t sink_usec; - pa_usec_t source_usec; - pa_usec_t block_usec; - size_t blocksize; - - if (proplist_get_fragment_size(source->proplist, &blocksize)) { - pa_log_debug("Failed to get blocksize from source"); - return -1; - } - - source_usec = pa_bytes_to_usec(blocksize, &source->sample_spec); - block_usec = u->blocksize ? pa_bytes_to_usec(u->blocksize, &source->sample_spec) : source_usec; - - /* - * limitation - * sink block >= source block >= blocksize * n - */ - if (source_usec < block_usec) { - pa_log_debug("Need to check period size. source >= block * n. " - "source(%llums), block_usec(%llums)", - source_usec / PA_USEC_PER_MSEC, - block_usec / PA_USEC_PER_MSEC); - return -1; - } - - if (!u->sink) - return 0; - - if (proplist_get_fragment_size(u->sink->proplist, &blocksize)) { - pa_log_debug("Failed to get blocksize from sink"); - return -1; - } - - sink_usec = pa_bytes_to_usec(blocksize, &u->sink->sample_spec); - - if (source_usec > sink_usec || sink_usec < block_usec) { - pa_log_debug("Need to check period size. sink >= source >= block * n. " - "source(%llums) sink(%llums) block_usec(%llums)", - source_usec / PA_USEC_PER_MSEC, - sink_usec / PA_USEC_PER_MSEC, - block_usec / PA_USEC_PER_MSEC); - return -1; - } - - return 0; -} - static pa_hook_result_t source_output_put_cb(pa_core *c, pa_source_output *o, void *userdata) { struct userdata *u = (struct userdata *)userdata; - size_t blocksize = u->blocksize; - pa_processor_algo_t method; + pa_processor_method_t method; + pa_usec_t process_usec; + pa_usec_t sink_process_usec; + int r; pa_assert(c); pa_assert(o); pa_assert(u); pa_assert(o->source); - if (!CHECK_FLAGS_AEC(o->flags)) + if (!(o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL)) return PA_HOOK_OK; - if (CHECK_COUNT_SOURCE_OUTPUT_AEC(u) > 0) { + if (u->n_source_output > 0) { pa_log_error("Not allow multi aec instance"); goto fail; } + if (proplist_get_method(o->proplist, u->default_method, &method) < 0) { + pa_log_error("Failed to get method"); + goto fail; + } + u->source = o->source; u->sink = find_reference_sink_by_proplist(c, o->proplist); if (!u->sink) { @@ -686,83 +443,46 @@ static pa_hook_result_t source_output_put_cb(pa_core *c, pa_source_output *o, vo goto fail; } - if (check_latency_validation(u, u->sink, u->source)) { - pa_log_error("Failed to check latency validation"); + /* Get period size of sink and source */ + if (proplist_get_fragment_size_usec(u->source->proplist, &u->source->sample_spec, &process_usec) < 0) { + pa_log_error("Failed to get fragment usec"); goto fail; } - /* Use the sources fragment size if blocksize is not specified */ - if (!blocksize) { - if (proplist_get_fragment_size(u->source->proplist, &blocksize)) { - pa_log_error("Failed to get blocksize"); - goto fail; - } + if (proplist_get_fragment_size_usec(u->sink->proplist, &u->sink->sample_spec, &sink_process_usec) < 0) { + pa_log_error("Failed to get fragment usec"); + goto fail; } - if (o->thread_info.resampler) - blocksize = pa_resampler_result(o->thread_info.resampler, blocksize); - - if (pa_processor_get_method(o, u->default_method, &method)) { - pa_log_error("Can't find method"); + if (sink_process_usec < process_usec) { + pa_log_error("sink process usec should be bigger than source"); goto fail; } - o->thread_info.processor = pa_processor_new(blocksize / pa_frame_size(&o->sample_spec), + o->thread_info.processor = pa_processor_new(c, process_usec / PA_USEC_PER_MSEC, &o->sample_spec, - method, PA_PROCESSOR_FLAGS_ECHO_CANCEL); + &o->channel_map, + &u->source->sample_spec, + method); if (!o->thread_info.processor) { pa_log_error("Failed to create pa_processor. echo-cancellation will be disabled"); goto fail; } - if (u->sink) { - if (proplist_get_fragment_size(u->sink->proplist, &blocksize)) { - pa_log_error("Failed to get blocksize"); - goto fail; - } - - if (!pa_sample_spec_equal(&u->sink->sample_spec, &o->sample_spec)) { - pa_resampler *resampler2; - - resampler2 = pa_resampler_new( - c->mempool, - &u->sink->sample_spec, &u->sink->channel_map, - &o->sample_spec, &o->channel_map, - c->lfe_crossover_freq, - c->resample_method, 0); - - if (!resampler2) { - pa_log_error("Failed to allocate resampler2 for echo-cancel"); - goto fail; - } - - o->thread_info.resampler2 = resampler2; - blocksize = pa_resampler_result(o->thread_info.resampler2, blocksize); - - pa_log_info("Use resampler2. blocksize(%d) bytes", blocksize); - } - - o->thread_info.echo = pa_memblockq_new("echo reference", - 0, - MEMBLOCKQ_MAXLENGTH, - 0, - &o->sample_spec, - 0, - blocksize, - 0, - &o->source->silence); - - if (!o->thread_info.echo) { - pa_log_error("Failed to alloc memblockq"); - goto fail; - } + r = pa_processor_bind_reference(o->thread_info.processor, + sink_process_usec / PA_USEC_PER_MSEC, + &u->sink->sample_spec, + &u->sink->channel_map); + if (r < 0) { + pa_log_error("Failed to bind reference source"); + goto fail; } - o->post_process = post_process; - - /* connect to sink and source */ - send_rebuild_rtpoll(PA_MSGOBJECT(u->source), PA_MSGOBJECT(u->echo_cancel), u->asyncmsgq_source); - send_rebuild_rtpoll(PA_MSGOBJECT(u->sink), PA_MSGOBJECT(u->echo_cancel), u->asyncmsgq_sink); + pa_log_info("echo-cancel source-output(%u) created. process_msec(%u), sink_process_msec(%u), method(%s)", + o->index, + (uint32_t)(process_usec / PA_USEC_PER_MSEC), + (uint32_t)(sink_process_usec / PA_USEC_PER_MSEC), + pa_processor_method_to_string(method)); update_state_by_source(u, true); @@ -770,30 +490,45 @@ static pa_hook_result_t source_output_put_cb(pa_core *c, pa_source_output *o, vo fail: o->flags &= ~PA_SOURCE_OUTPUT_ECHO_CANCEL; // TODO: need to consider DONT_MOVE define - free_source_output_extra_resource(o); + if (o->thread_info.processor) + pa_processor_free(o->thread_info.processor); return PA_HOOK_OK; } /* Call from main thread */ -static pa_hook_result_t source_output_unlink_post_cb(pa_core *c, pa_source_output *o, void *userdata) { +static pa_hook_result_t source_output_unlink_cb(pa_core *c, pa_source_output *o, void *userdata) { struct userdata *u = (struct userdata *)userdata; pa_assert(c); pa_assert(o); pa_assert(u); - if (!CHECK_FLAGS_AEC(o->flags)) + if (!(o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL)) return PA_HOOK_OK; update_state_by_source(u, false); - send_rebuild_rtpoll(PA_MSGOBJECT(u->source), NULL, NULL); - send_rebuild_rtpoll(PA_MSGOBJECT(u->sink), NULL, NULL); + return PA_HOOK_OK; +} + +static pa_hook_result_t source_output_unlink_post_cb(pa_core *c, pa_source_output *o, void *userdata) { + struct userdata *u = (struct userdata *)userdata; + + pa_assert(c); + pa_assert(o); + pa_assert(u); + + if (!(o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL)) + return PA_HOOK_OK; + + pa_processor_free(o->thread_info.processor); u->source = NULL; u->sink = NULL; + pa_log_info("echo-cancel source-output(%u) is unlinked", o->index); + return PA_HOOK_OK; } @@ -877,7 +612,6 @@ fail: int pa__init(pa_module *m) { pa_modargs *ma = NULL; struct userdata *u = NULL; - uint32_t blocksize = 0; pa_assert(m); @@ -886,15 +620,9 @@ int pa__init(pa_module *m) { return -1; } - if (pa_modargs_get_value_u32(ma, "blocksize", &blocksize) < 0) { - pa_log_info("Failed to get blocksize"); - goto fail; - } - m->userdata = u = pa_xnew0(struct userdata, 1); u->core = m->core; u->m = m; - u->blocksize = blocksize; u->default_method = pa_xstrdup(pa_modargs_get_value(ma, "method", DEFAULT_AEC_METHOD)); u->echo_cancel = pa_msgobject_new(pa_echo_cancel); @@ -925,6 +653,10 @@ int pa__init(pa_module *m) { pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_PUT], PA_HOOK_EARLY, (pa_hook_cb_t) source_output_put_cb, u); + u->source_output_unlink_slot = + pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_UNLINK], + PA_HOOK_EARLY, (pa_hook_cb_t) source_output_unlink_cb, u); + u->source_output_unlink_post_slot = pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_UNLINK_POST], PA_HOOK_EARLY, (pa_hook_cb_t) source_output_unlink_post_cb, u); @@ -989,19 +721,20 @@ void pa__done(pa_module *m) { if (u->asyncmsgq_source) { if (u->source) { + pa_source_output *o; + pa_asyncmsgq_post(u->source->asyncmsgq, PA_MSGOBJECT(u->source), PA_SOURCE_MESSAGE_SET_AEC_STATE, (void *)false, 0, NULL, NULL); send_rebuild_rtpoll(PA_MSGOBJECT(u->source), NULL, NULL); - free_source_output_extra_resource_by_source(u->source); + + if ((o = find_source_output_by_flags(u->source))) + pa_processor_free(o->thread_info.processor); } pa_asyncmsgq_unref(u->asyncmsgq_source); } - if (u->delayq) - pa_memblockq_free(u->delayq); - if (u->rtpoll) pa_rtpoll_free(u->rtpoll); diff --git a/src/echo-cancel/processor.c b/src/echo-cancel/processor.c index 3681347..a97b99c 100644 --- a/src/echo-cancel/processor.c +++ b/src/echo-cancel/processor.c @@ -24,11 +24,16 @@ #endif #include +#include +#include #include #include +#include +#include #include "processor.h" +//#define __DEBUG__ #ifdef __DEBUG__ #include #include @@ -37,10 +42,42 @@ #include #endif -struct pa_processor_algo { +#define MEMBLOCKQ_MAXLENGTH (16 * 1024 * 1024) + +typedef struct pa_processor_method_interface pa_processor_method_interface; +struct pa_processor { + pa_core *core; + pa_processor_method_interface *intf; + pa_processor_method_t method; + void *priv; + + size_t process_frames; + pa_usec_t process_usec; + pa_usec_t reference_process_usec; + size_t process_bytes; + size_t reference_process_bytes; + + pa_sample_spec *output_ss; + pa_channel_map *output_chmap; + pa_sample_spec *source_ss; + pa_sample_spec reference_memblockq_ss; + + pa_resampler *resampler; + pa_memblockq *reference_memblockq; + pa_memblockq *output_memblockq; + +#ifdef __DEBUG__ + int fdrec, fdref, fdout; + struct timeval before, after; +#endif +}; + +struct pa_processor_method_interface { + const char *name; void *(*create)(size_t nframes, pa_sample_spec *ss); int32_t (*process)(void *priv, int8_t *rec, int8_t *ref, int8_t *out); int32_t (*destroy)(void *priv); + int32_t (*change_reference_spec)(void *priv, pa_sample_spec *source_ss, pa_sample_spec *sample_spec, pa_channel_map *map); }; extern void *adrian_create(size_t nframes, pa_sample_spec *ss); @@ -51,145 +88,463 @@ extern void *speex_create(size_t nframes, pa_sample_spec *ss); extern int32_t speex_process(void *priv, int8_t *rec, int8_t *ref, int8_t *out); extern int32_t speex_destroy(void *priv); +#ifdef SUPPORT_METHOD_WEBRTC extern void *webrtc_audio_create(size_t nframes, pa_sample_spec *ss); extern int32_t webrtc_audio_process(void *priv, int8_t *rec, int8_t *ref, int8_t *out); extern int32_t webrtc_audio_destroy(void *priv); +#endif -pa_processor *pa_processor_new(size_t nframes, pa_sample_spec *ss, pa_processor_algo_t backend, pa_process_flags_t flags) { - pa_processor *processor = NULL; +extern void *reference_copy_create(size_t nframes, pa_sample_spec *ss); +extern int32_t reference_copy_process(void *priv, int8_t *rec, int8_t *ref, int8_t *out); +extern int32_t reference_copy_destroy(void *priv); +extern int32_t reference_copy_change_reference_spec(void *priv, pa_sample_spec *source_ss, pa_sample_spec *sample_spec, pa_channel_map *map); - pa_assert(ss); +#ifdef __DEBUG__ +static void debug_open_file(pa_processor *processor); +static void debug_timestamp_begin(pa_processor *processor); +static void debug_timestamp_end(pa_processor *processor); +static void debug_write_file(pa_processor *processor, int8_t *rec, int8_t *ref, int8_t *out); +static void debug_close_file(pa_processor *processor); +#else +#define debug_open_file(x) +#define debug_timestamp_begin(x) +#define debug_timestamp_end(x) +#define debug_write_file(x, a, b, c) +#define debug_close_file(x) +#endif - if (ss->format != PA_SAMPLE_S16LE) { - pa_log_error("Not supported format(%d)", ss->format); +static struct pa_processor_method_interface method_table[PA_PROCESSOR_METHOD_MAX] = { + { + "speex", + speex_create, + speex_process, + speex_destroy, + NULL, + }, + { + "adrian", + adrian_create, + adrian_process, + adrian_destroy, + NULL, + }, +#ifdef SUPPORT_METHOD_WEBRTC + { + "webrtc", + webrtc_audio_create, + webrtc_audio_process, + webrtc_audio_destroy, + NULL, + }, +#endif + { + "reference_copy", + reference_copy_create, + reference_copy_process, + reference_copy_destroy, + reference_copy_change_reference_spec, + }, +}; + +static size_t pa_processor_usec_to_frame(pa_usec_t usec, pa_sample_spec *spec) { + pa_assert(spec); + + return pa_usec_to_bytes(usec, spec) / pa_frame_size(spec); +} + +pa_processor *pa_processor_new(pa_core *core, + uint32_t process_msec, + pa_sample_spec *output_ss, + pa_channel_map *output_map, + pa_sample_spec *source_ss, + pa_processor_method_t method) { + pa_processor *processor; + pa_memchunk silence; + + pa_assert(core); + pa_assert(output_ss); + pa_assert(output_map); + pa_assert(source_ss); + pa_assert(method < PA_PROCESSOR_METHOD_MAX); + + processor = pa_xnew0(pa_processor, 1); + processor->intf = &method_table[method]; + processor->core = core; + processor->process_usec = process_msec * PA_USEC_PER_MSEC; + processor->output_ss = output_ss; + processor->output_chmap = output_map; + processor->source_ss = source_ss; + processor->method = method; + processor->process_frames = pa_processor_usec_to_frame(processor->process_usec, processor->output_ss); + processor->process_bytes = pa_usec_to_bytes(processor->process_usec, processor->output_ss); + + if (!(processor->priv = processor->intf->create(processor->process_frames, output_ss))) { + pa_log_error("Failed to create processor. rate(%d), channels(%d).", output_ss->rate, output_ss->channels); + pa_xfree(processor); return NULL; } - processor = pa_xnew0(pa_processor, 1); - processor->intf = pa_xnew0(pa_processor_algo, 1); - processor->nframes = nframes; - processor->framesize = pa_frame_size(ss); - - switch (backend) { - case PA_PROCESSOR_SPEEX: - processor->intf->create = speex_create; - processor->intf->process = speex_process; - processor->intf->destroy = speex_destroy; - break; - case PA_PROCESSOR_ADRIAN: - processor->intf->create = adrian_create; - processor->intf->process = adrian_process; - processor->intf->destroy = adrian_destroy; - break; - case PA_PROCESSOR_WEBRTC: - processor->intf->create = webrtc_audio_create; - processor->intf->process = webrtc_audio_process; - processor->intf->destroy = webrtc_audio_destroy; - break; - default: - pa_log_error("Invalid backend(%d)", backend); - goto fail; + pa_silence_memchunk_get(&core->silence_cache, core->mempool, &silence, output_ss, 0); + processor->output_memblockq = pa_memblockq_new("source-output memblockq", + 0, + MEMBLOCKQ_MAXLENGTH, + 0, + processor->output_ss, + 0, + pa_usec_to_bytes(processor->process_usec, output_ss), + 0, + &silence); + pa_memblock_unref(silence.memblock); + + pa_log_info("Created processor. memblockq rate(%d), channels(%d), process_msec(%u), " + "process_bytes(%zu), method(%s), source rate(%d), channels(%d)", + output_ss->rate, + output_ss->channels, + process_msec, + pa_usec_to_bytes(processor->process_usec, output_ss), + method_table[method].name, + source_ss->rate, + source_ss->channels); + + debug_open_file(processor); + + return processor; +} + +int pa_processor_bind_reference(pa_processor *processor, + uint32_t process_msec, + pa_sample_spec *reference_ss, + pa_channel_map *reference_chmap) { + + pa_sample_spec sample_spec; + pa_channel_map channel_map; + pa_memchunk silence; + + pa_assert(processor); + pa_assert(processor->intf); + pa_assert(processor->output_ss); + pa_assert(processor->output_chmap); + pa_assert(reference_ss); + pa_assert(reference_chmap); + + processor->reference_process_usec = process_msec * PA_USEC_PER_MSEC; + if (processor->reference_process_usec < processor->process_usec) { + pa_log_error("Failed to bind reference. ref_usec(%" PRId64 "), process_usec(%" PRId64 ")", + processor->reference_process_usec, processor->process_usec); + return -1; } - pa_log_info("Use backend(%d) nframes(%zu) framesize(%d)", - backend, processor->nframes, processor->framesize); + sample_spec = *processor->output_ss; + channel_map = *processor->output_chmap; - if (!(processor->priv = processor->intf->create(nframes, ss))) { - pa_log_error("Failed to create processor"); - goto fail; + /* select reference memblockq sample_spec and channelmap */ + if (processor->intf->change_reference_spec) { + if (processor->intf->change_reference_spec(processor->priv, processor->source_ss, &sample_spec, &channel_map) < 0) { + pa_log_error("Failed to get reference info"); + return -1; + } } -#ifdef __DEBUG__ - { - static int n = 1; - char rec[32], ref[32], out[32]; + /* Create resampler */ + if (!pa_sample_spec_equal(reference_ss, &sample_spec)) { + if (processor->resampler) + pa_resampler_free(processor->resampler); + + processor->resampler = pa_resampler_new(processor->core->mempool, + reference_ss, reference_chmap, + &sample_spec, &channel_map, + processor->core->lfe_crossover_freq, + processor->core->resample_method, 0); + if (!processor->resampler) { + pa_log_error("Failed to allocate reference resampler"); + return -1; + } + } - snprintf(rec, sizeof(rec), "/tmp/rec-%d.raw", n); - snprintf(ref, sizeof(ref), "/tmp/ref-%d.raw", n); - snprintf(out, sizeof(out), "/tmp/out-%d.raw", n); - n += 1; + processor->reference_memblockq_ss = sample_spec; + processor->reference_process_bytes = pa_usec_to_bytes(processor->reference_process_usec, &processor->reference_memblockq_ss); - unlink(rec); - unlink(ref); - unlink(out); + /* Create memblockq */ + pa_silence_memchunk_get(&processor->core->silence_cache, processor->core->mempool, &silence, &sample_spec, 0); - processor->fdrec = open(rec, O_RDWR | O_CREAT | O_TRUNC, 777); - processor->fdref = open(ref, O_RDWR | O_CREAT | O_TRUNC, 777); - processor->fdout = open(out, O_RDWR | O_CREAT | O_TRUNC, 777); - } -#endif + if (processor->reference_memblockq) + pa_memblockq_free(processor->reference_memblockq); - return processor; + processor->reference_memblockq = pa_memblockq_new("reference memblockq", + 0, + MEMBLOCKQ_MAXLENGTH, + 0, + &processor->reference_memblockq_ss, + 0, + processor->reference_process_bytes, + 0, + &silence); + pa_memblock_unref(silence.memblock); -fail: - pa_xfree(processor->intf); - pa_xfree(processor); + pa_log_debug("Created reference memblockq rate(%d), channels(%d), msec(%u), bytes(%zu)", + sample_spec.rate, + sample_spec.channels, + process_msec, + processor->reference_process_bytes); - return NULL; + return 0; } -int pa_processor_process(pa_processor *processor, int8_t *rec, int8_t *ref, int8_t *out) { - int ret = -1; +int pa_processor_setup_reference_memblockq_padding(pa_processor *processor, pa_usec_t latency) { + pa_memchunk silence; + int64_t write_index, read_index; + size_t n, bytes; pa_assert(processor); + + bytes = pa_usec_to_bytes(latency, &processor->reference_memblockq_ss); + n = (bytes + (processor->reference_process_bytes - 1)) / processor->reference_process_bytes; + + pa_silence_memchunk_get( + &processor->core->silence_cache, + processor->core->mempool, + &silence, + &processor->reference_memblockq_ss, + processor->reference_process_bytes); + + write_index = pa_memblockq_get_write_index(processor->reference_memblockq); + read_index = pa_memblockq_get_read_index(processor->reference_memblockq); + + for (; n > 0; n--) + pa_memblockq_push(processor->reference_memblockq, &silence); + + pa_memblock_unref(silence.memblock); + + pa_log_info("push n(%u) silence blocks. ref_process_bytes(%zu) " + "write_index(%" PRId64 "->%" PRId64 "), read_index(%" PRId64 "->%" PRId64 ")", + pa_memblockq_get_nblocks(processor->reference_memblockq), + processor->reference_process_bytes, + write_index, pa_memblockq_get_write_index(processor->reference_memblockq), + read_index, pa_memblockq_get_read_index(processor->reference_memblockq)); + + return 0; +} + +// TODO naming +int pa_processor_process(pa_processor *processor, pa_memchunk *chunk, pa_memchunk *ochunk) { + int r = -1; + int8_t *recording = NULL; + int8_t *reference = NULL; + int8_t *output = NULL; + bool silence = false; + + pa_memchunk ichunk, rchunk; + + pa_assert(processor); + pa_assert(processor->output_memblockq); + pa_assert(processor->reference_memblockq); pa_assert(processor->intf); - pa_assert(rec); - pa_assert(out); + pa_assert(processor->process_bytes > 0ULL); + pa_assert(processor->reference_process_bytes > 0ULL); + pa_assert(chunk); + pa_assert(ochunk); + + if ((r = pa_memblockq_push(processor->output_memblockq, chunk)) < 0) { + pa_log_error("Failed to push chunk to echo memblockq"); + return r; + } -#ifdef __DEBUG__ - if (write(processor->fdrec, rec, processor->nframes * processor->framesize) <= 0) - pa_log_error("Failed to write rec buffer"); + if ((r = pa_memblockq_peek_fixed_size(processor->reference_memblockq, processor->reference_process_bytes, &rchunk)) < 0) { + pa_log_error("Failed to get memblock from reference memblockq"); + return r; + } + silence = pa_memblock_is_silence(rchunk.memblock); - if (write(processor->fdref, ref, processor->nframes * processor->framesize) <= 0) - pa_log_error("Failed to write ref buffer"); + if ((r = pa_memblockq_peek_fixed_size(processor->output_memblockq, processor->process_bytes, &ichunk)) < 0) { + pa_log_error("Failed to get memblock from output memblockq"); + return r; + } - gettimeofday(&processor->before, NULL); -#endif + ochunk->index = 0; + ochunk->length = ichunk.length; + ochunk->memblock = pa_memblock_new(processor->core->mempool, ochunk->length); - if (processor->intf->process) - ret = processor->intf->process(processor->priv, rec, ref, out); + recording = pa_memblock_acquire(ichunk.memblock); + reference = pa_memblock_acquire(rchunk.memblock); + output = pa_memblock_acquire(ochunk->memblock); -#ifdef __DEBUG__ - if (write(processor->fdout, out, processor->nframes * processor->framesize) <= 0) - pa_log_error("Failed to write out buffer"); + debug_timestamp_begin(processor); - gettimeofday(&processor->after, NULL); + r = processor->intf->process(processor->priv, recording, reference, output); - pa_log_debug("It takes time(%ld) bytes(%d)", - 1000 * (after.tv_sec-before.tv_sec) + (after.tv_usec-before.tv_usec) / 1000, - processor->buffer_size); -#endif + debug_timestamp_end(processor); + debug_write_file(processor, recording, reference, output); + + pa_memblock_release(ichunk.memblock); + pa_memblock_release(rchunk.memblock); + pa_memblock_release(ochunk->memblock); - return ret; + pa_memblock_unref(rchunk.memblock); + pa_memblockq_drop(processor->reference_memblockq, rchunk.length); + + pa_memblock_unref(ichunk.memblock); + pa_memblockq_drop(processor->output_memblockq, ichunk.length); + + pa_log_debug("Post-process. rec(%" PRIu64 "ms), ref(%" PRIu64 "msms) out(%" PRIu64 "ms), " + "silence(%d), windex:rindex(%" PRId64 ":%" PRId64 ")", + pa_bytes_to_usec(ichunk.length, processor->output_ss) / PA_USEC_PER_MSEC, + pa_bytes_to_usec(rchunk.length, &processor->reference_memblockq_ss) / PA_USEC_PER_MSEC, + pa_bytes_to_usec(ochunk->length, processor->output_ss) / PA_USEC_PER_MSEC, + silence, + pa_memblockq_get_write_index(processor->reference_memblockq), + pa_memblockq_get_read_index(processor->reference_memblockq)); + + return r; +} + +int pa_processor_push_data(pa_processor *processor, pa_memchunk *chunk) { + pa_memchunk ochunk; + int r; + + pa_assert(processor); + pa_assert(chunk); + + if (processor->resampler) { + pa_resampler_run(processor->resampler, chunk, &ochunk); + chunk = &ochunk; + } + + if ((r = pa_memblockq_push(processor->reference_memblockq, chunk)) < 0) + pa_log_error("Failed to push chunk to echo memblockq"); + + if (processor->resampler) + pa_memblock_unref(chunk->memblock); + + pa_log_debug("Pushed echo data. bytes(%zu), msec(%" PRIu64 "ms), nblocks(%d) index(%" PRId64 ":%" PRId64 ")", + chunk->length, + pa_bytes_to_usec(chunk->length, &processor->reference_memblockq_ss) / PA_USEC_PER_MSEC, + pa_memblockq_get_nblocks(processor->reference_memblockq), + pa_memblockq_get_write_index(processor->reference_memblockq), + pa_memblockq_get_read_index(processor->reference_memblockq)); + + return r; +} + +void pa_processor_flush(pa_processor *processor) { + pa_assert(processor); + + if (processor->reference_memblockq) + pa_memblockq_flush_read(processor->reference_memblockq); + + if (processor->output_memblockq) + pa_memblockq_flush_read(processor->output_memblockq); } int pa_processor_free(pa_processor *processor) { pa_assert(processor); + pa_assert(processor->priv); pa_assert(processor->intf); - pa_assert(processor->intf->destroy); - -#ifdef __DEBUG__ - if (processor->fdrec) - close(processor->fdrec); - if (processor->fdref) - close(processor->fdref); - if (processor->fdout) - close(processor->fdout); -#endif - if (processor->intf->destroy(processor->priv)) { + if (processor->intf->destroy(processor->priv) < 0) pa_log_error("Failed to destroy processor"); - return -1; - } - pa_xfree(processor->intf); + if (processor->resampler) + pa_resampler_free(processor->resampler); + + if (processor->reference_memblockq) + pa_memblockq_free(processor->reference_memblockq); + + if (processor->output_memblockq) + pa_memblockq_free(processor->output_memblockq); + + debug_close_file(processor); + pa_xfree(processor); return 0; } -size_t pa_processor_get_blocksize(pa_processor *processor) { - pa_assert(processor); +pa_processor_method_t pa_processor_get_method(const char *request_method, const char *default_method) { + const char *selected; + + if (!request_method || !default_method) + return PA_PROCESSOR_SPEEX; + + selected = pa_streq(request_method, "default") ? default_method : request_method; - return processor->nframes; + if (pa_streq(selected, "speex")) + return PA_PROCESSOR_SPEEX; + else if (pa_streq(selected, "adrian")) + return PA_PROCESSOR_ADRIAN; +#ifdef SUPPORT_METHOD_WEBRTC + else if (pa_streq(selected, "webrtc")) + return PA_PROCESSOR_WEBRTC; +#endif + else if (pa_streq(selected, "reference_copy")) + return PA_PROCESSOR_REFERENCE_COPY; + else + return PA_PROCESSOR_SPEEX; +} + +const char *pa_processor_method_to_string(pa_processor_method_t method) { + if (method >= PA_PROCESSOR_METHOD_MAX) + return NULL; + + return method_table[method].name; +} + +#ifdef __DEBUG__ +static void debug_open_file(pa_processor *processor) { + static int n = 1; + char rec[32], ref[32], out[32]; + + snprintf(rec, sizeof(rec), "/tmp/rec-%d.raw", n); + snprintf(ref, sizeof(ref), "/tmp/ref-%d.raw", n); + snprintf(out, sizeof(out), "/tmp/out-%d.raw", n); + n += 1; + + unlink(rec); + unlink(ref); + unlink(out); + + processor->fdrec = open(rec, O_RDWR | O_CREAT | O_TRUNC, 777); + processor->fdref = open(ref, O_RDWR | O_CREAT | O_TRUNC, 777); + processor->fdout = open(out, O_RDWR | O_CREAT | O_TRUNC, 777); +} + +static void debug_timestamp_begin(pa_processor *processor) { + gettimeofday(&processor->before, NULL); +} + +static void debug_timestamp_end(pa_processor *processor) { + gettimeofday(&processor->after, NULL); + + pa_log_debug("It takes time (%ld)ms.", + 1000 * (processor->after.tv_sec - processor->before.tv_sec) + + (processor->after.tv_usec - processor->before.tv_usec) / 1000); +} + +static void debug_write_file(pa_processor *processor, int8_t *rec, int8_t *ref, int8_t *out) { + if (rec && write(processor->fdrec, rec, processor->process_bytes) <= 0) + pa_log_error("Failed to write recording buffer"); + + if (ref && write(processor->fdref, ref, processor->reference_process_bytes) <= 0) + pa_log_error("Failed to write reference buffer"); + + if (out && write(processor->fdout, out, processor->process_bytes) <= 0) + pa_log_error("Failed to write ref buffer"); +} + +static void debug_close_file(pa_processor *processor) { + if (processor->fdrec) { + close(processor->fdrec); + processor->fdrec = -1; + } + + if (processor->fdref) { + close(processor->fdref); + processor->fdref = -1; + } + + if (processor->fdout) { + close(processor->fdout); + processor->fdout = -1; + } } +#endif + diff --git a/src/echo-cancel/processor.h b/src/echo-cancel/processor.h index fa642ce..fa77e87 100644 --- a/src/echo-cancel/processor.h +++ b/src/echo-cancel/processor.h @@ -1,6 +1,3 @@ -#ifndef foopulseprocessorfoo -#define foopulseprocessorfoo - /*** This file is part of PulseAudio. @@ -22,44 +19,47 @@ USA. ***/ +#ifndef foopulseprocessorfoo +#define foopulseprocessorfoo + #ifdef HAVE_CONFIG_H #include #endif +#include +#include +#include #include -//#define __DEBUG__ - -typedef enum { - PA_PROCESSOR_FLAGS_ECHO_CANCEL, - PA_PROCESSOR_FLAGS_NOISE_SUPPRESSION, -} pa_process_flags_t; - typedef enum { PA_PROCESSOR_SPEEX, PA_PROCESSOR_ADRIAN, +#ifdef SUPPORT_METHOD_WEBRTC PA_PROCESSOR_WEBRTC, -} pa_processor_algo_t; +#endif + PA_PROCESSOR_REFERENCE_COPY, + PA_PROCESSOR_METHOD_MAX, +} pa_processor_method_t; -typedef struct pa_processor_algo pa_processor_algo; typedef struct pa_processor pa_processor; -struct pa_processor { - pa_processor_algo *intf; - void *priv; - void *userdata; - size_t nframes; - size_t framesize; - -#ifdef __DEBUG__ - int fdrec, fdref, fdout; - struct timeval before, after; -#endif -}; - -pa_processor *pa_processor_new(size_t nframes, pa_sample_spec *ss, pa_processor_algo_t backend, pa_process_flags_t flags); -int pa_processor_process(pa_processor *processor, int8_t *rec, int8_t *ref, int8_t *out); +pa_processor *pa_processor_new(pa_core *core, + uint32_t process_msec, + pa_sample_spec *output_ss, + pa_channel_map *output_map, + pa_sample_spec *source_ss, + pa_processor_method_t method); +int pa_processor_bind_reference(pa_processor *processor, + uint32_t process_msec, + pa_sample_spec *reference_ss, + pa_channel_map *reference_chmap); +int pa_processor_setup_reference_memblockq_padding(pa_processor *processor, pa_usec_t latency); +int pa_processor_process(pa_processor *processor, pa_memchunk *rec, pa_memchunk *out); +int pa_processor_push_data(pa_processor *processor, pa_memchunk *chunk); +void pa_processor_flush(pa_processor *processor); int pa_processor_free(pa_processor *processor); -size_t pa_processor_get_blocksize(pa_processor *processor); +pa_processor_method_t pa_processor_get_method(const char *requset_method, const char *default_method); +const char *pa_processor_method_to_string(pa_processor_method_t method); #endif + diff --git a/src/module-tizenaudio-source2.c b/src/module-tizenaudio-source2.c index 0fc047c..5bd3a61 100644 --- a/src/module-tizenaudio-source2.c +++ b/src/module-tizenaudio-source2.c @@ -279,19 +279,6 @@ static int source_process_msg( return 0; } - case PA_SOURCE_MESSAGE_REMOVE_OUTPUT: { - pa_source_output *o = (pa_source_output *)data; - - if (!(o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL)) - break; - - if (u->ec_asyncmsgq) { - pa_asyncmsgq_send(u->ec_asyncmsgq, u->ec_object, - PA_ECHO_CANCEL_MESSAGE_SOURCE_OUTPUT_UNLINK, o, 0, NULL); - } - - return 0; - } case PA_SOURCE_MESSAGE_REBUILD_RTPOLL: { struct arguments { pa_msgobject *o;