tizenaudio-echo-cancel: Support reference raw copy functionality 08/273808/8 submit/tizen/20220421.081718
authorJaechul Lee <jcsing.lee@samsung.com>
Tue, 29 Mar 2022 08:06:43 +0000 (17:06 +0900)
committerJaechul Lee <jcsing.lee@samsung.com>
Thu, 21 Apr 2022 06:07:47 +0000 (15:07 +0900)
processing echo-cancellation in module-tizenaudio-echo-cancel moved to processor
modules for supporting reference copy.

 * created reference copy
 * removed delayq

[Version] 15.0.10
[Issue Type] New Feature

Change-Id: Id4529e7f6225b1c3f1d6e499bc915aaef1a940c5
Signed-off-by: Jaechul Lee <jcsing.lee@samsung.com>
Makefile.am
packaging/pulseaudio-modules-tizen.spec
src/echo-cancel/algo_reference_copy.c [new file with mode: 0644]
src/echo-cancel/module-tizenaudio-echo-cancel.c
src/echo-cancel/processor.c
src/echo-cancel/processor.h
src/module-tizenaudio-source2.c

index c41e653..e61e057 100644 (file)
@@ -90,6 +90,7 @@ module_tizenaudio_source2_la_CFLAGS = $(MODULE_CFLAGS) -DPA_MODULE_NAME=module_t
 
 libprocessor_la_SOURCES = \
           src/echo-cancel/algo_speex.c \
+          src/echo-cancel/algo_reference_copy.c \
           src/echo-cancel/algo_adrian.c \
           src/echo-cancel/adrian-aec.c \
           src/echo-cancel/processor.c \
@@ -102,7 +103,7 @@ libprocessor_la_CFLAGS = $(AM_CFLAGS) $(PA_CFLAGS) $(LIBSPEEX_CFLAGS)
 if ENABLE_WEBRTC
 libprocessor_la_SOURCES += src/echo-cancel/algo_webrtc.cpp
 libprocessor_la_LIBADD += $(WEBRTC_LIBS)
-libprocessor_la_CPPFLAGS = $(WEBRTC_CFLAGS) $(PA_CFLAGS) -std=c++17
+libprocessor_la_CPPFLAGS = $(WEBRTC_CFLAGS) $(PA_CFLAGS) -DSUPPORT_METHOD_WEBRTC -std=c++17
 endif
 
 module_tizenaudio_echo_cancel_la_SOURCES = src/echo-cancel/module-tizenaudio-echo-cancel.c src/echo-cancel/echo-cancel-def.h
index 38eda7d..2611033 100644 (file)
@@ -2,7 +2,7 @@
 
 Name:             pulseaudio-modules-tizen
 Summary:          Pulseaudio modules for Tizen
-Version:          15.0.9
+Version:          15.0.10
 Release:          0
 Group:            Multimedia/Audio
 License:          LGPL-2.1+
diff --git a/src/echo-cancel/algo_reference_copy.c b/src/echo-cancel/algo_reference_copy.c
new file mode 100644 (file)
index 0000000..2d9cf0b
--- /dev/null
@@ -0,0 +1,106 @@
+/***
+  This file is part of PulseAudio.
+
+  Copyright 2022 Jaechul Lee <jcsing.lee@samsung.com>
+
+  PulseAudio is free software; you can redistribute it and/or modify
+  it under the terms of the GNU Lesser General Public License as published
+  by the Free Software Foundation; either version 2.1 of the License,
+  or (at your option) any later version.
+
+  PulseAudio is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with PulseAudio; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+  USA.
+***/
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdint.h>
+#include <pulse/xmalloc.h>
+#include <pulse/sample.h>
+#include <pulse/channelmap.h>
+#include <pulsecore/log.h>
+#include <pulsecore/macro.h>
+
+struct reference_copy {
+    size_t nframes;
+    pa_sample_spec ss;
+    int reference_channels;
+};
+
+void *reference_copy_create(size_t nframes, pa_sample_spec *ss) {
+    struct reference_copy *rc;
+
+    pa_assert(ss);
+
+    rc = pa_xnew0(struct reference_copy, 1);
+    rc->nframes = nframes;
+    rc->ss = *ss;
+
+    return rc;
+}
+
+int32_t reference_copy_process(void *priv, int8_t *rec, int8_t *ref, int8_t *out) {
+    struct reference_copy *rc = priv;
+    size_t rec_bytes, ref_bytes, actual_bytes, total_bytes;
+    int8_t *dst = out;
+
+    pa_assert(rc);
+    pa_assert(rec);
+    pa_assert(ref);
+    pa_assert(out);
+
+    rec_bytes = pa_frame_size(&rc->ss);
+    ref_bytes = rc->reference_channels * pa_sample_size(&rc->ss);
+    actual_bytes = rec_bytes - ref_bytes;
+    total_bytes = rec_bytes * rc->nframes;
+
+    while ((dst - out) < total_bytes) {
+        memcpy(dst, rec, actual_bytes);
+        memcpy(dst + actual_bytes, ref, ref_bytes);
+        dst += rec_bytes;
+        rec += rec_bytes;
+        ref += ref_bytes;
+    }
+
+    return 0;
+}
+
+int32_t reference_copy_destroy(void *priv) {
+    struct reference_copy *rc = priv;
+
+    pa_assert(rc);
+
+    pa_xfree(rc);
+
+    return 0;
+}
+
+int32_t reference_copy_change_reference_spec(void *priv, pa_sample_spec *source_ss, pa_sample_spec *sample_spec, pa_channel_map *map) {
+    struct reference_copy *rc = priv;
+    int channels;
+
+    pa_assert(rc);
+    pa_assert(source_ss);
+    pa_assert(sample_spec);
+    pa_assert(map);
+
+    channels = rc->ss.channels - source_ss->channels;
+    if (channels <= 0)
+        return -1;
+
+    *sample_spec = rc->ss;
+    sample_spec->channels = rc->reference_channels = channels;
+
+    pa_channel_map_init_auto(map, channels, PA_CHANNEL_MAP_AIFF);
+
+    return 0;
+}
index 2f95314..8aa456c 100644 (file)
@@ -46,8 +46,9 @@ PA_MODULE_DESCRIPTION("Tizen Audio Echo Cancel");
 PA_MODULE_VERSION(PACKAGE_VERSION);
 PA_MODULE_LOAD_ONCE(true);
 PA_MODULE_USAGE(
-        "method=<name of method using for echo cancellation> "
-        "blocksize=<the bytes of processing block on source domain> ");
+        "method=<name of method using for echo cancellation> ");
+
+#define DEFAULT_PROCESS_MSEC 10
 
 typedef struct echo_cancel pa_echo_cancel;
 struct userdata {
@@ -67,7 +68,7 @@ struct userdata {
 
     bool enable;
     uint32_t n_source_output;
-    size_t blocksize;
+
     char *default_method;
 
     pa_thread *thread;
@@ -76,8 +77,8 @@ struct userdata {
     pa_echo_cancel *echo_cancel;
 
     /* use in thread */
-    pa_memblockq *delayq;
     bool enable_in_thread;
+    bool triggered;
 
     pa_asyncmsgq *asyncmsgq_sink;
     pa_asyncmsgq *asyncmsgq_source;
@@ -92,51 +93,42 @@ PA_DEFINE_PRIVATE_CLASS(pa_echo_cancel, pa_msgobject);
 #define PA_ECHO_CANCEL(o) (pa_echo_cancel_cast(o))
 
 #define MEMBLOCKQ_MAXLENGTH (16 * 1024 * 1024)
-#define CHECK_FLAGS_AEC(x) (x & PA_SOURCE_OUTPUT_ECHO_CANCEL)
-#define CHECK_COUNT_SOURCE_OUTPUT_AEC(x) (x->n_source_output)
 #define DEFAULT_AEC_METHOD "speex"
 
 static const char* const valid_modargs[] = {
     "method",
-    "blocksize",
     NULL,
 };
 
-static int proplist_get_fragment_size(pa_proplist *p, size_t *size) {
-    const char *fragsize;
-    uint32_t blocksize;
+static int proplist_get_fragment_size_usec(pa_proplist *p, pa_sample_spec *sample_spec, pa_usec_t *usec) {
+    const char *prop_fragsize;
+    uint32_t fragsize;
+
+    pa_assert(p);
+    pa_assert(sample_spec);
+    pa_assert(usec);
 
-    if (!(fragsize = pa_proplist_gets(p, PA_PROP_DEVICE_BUFFERING_FRAGMENT_SIZE)))
+    if (!(prop_fragsize = pa_proplist_gets(p, PA_PROP_DEVICE_BUFFERING_FRAGMENT_SIZE)))
         return -1;
 
-    if (pa_atou(fragsize, &blocksize))
+    if (pa_atou(prop_fragsize, &fragsize))
         return -1;
 
-    *size = blocksize;
+    *usec = pa_bytes_to_usec(fragsize, sample_spec);
 
     return 0;
 }
 
-static int pa_processor_get_method(pa_source_output *o, const char *default_method, pa_processor_algo_t *method) {
-    const char *selected, *requested;
+static int proplist_get_method(pa_proplist *p, const char *default_method, pa_processor_method_t *method) {
+    const char *m;
 
+    pa_assert(p);
     pa_assert(method);
-    pa_assert(default_method);
 
-    requested = pa_proplist_gets(o->proplist, PA_PROP_MEDIA_ECHO_CANCEL_METHOD);
-    if (!requested)
+    if (!(m = pa_proplist_gets(p, PA_PROP_MEDIA_ECHO_CANCEL_METHOD)))
         return -1;
 
-    selected = pa_streq(requested, "default") ? default_method : requested;
-
-    if (pa_streq(selected, "webrtc"))
-        *method = PA_PROCESSOR_WEBRTC;
-    else if (pa_streq(selected, "speex"))
-        *method = PA_PROCESSOR_SPEEX;
-    else if (pa_streq(selected, "adrian"))
-        *method = PA_PROCESSOR_ADRIAN;
-    else
-        *method = PA_PROCESSOR_SPEEX;
+    *method = pa_processor_get_method(m, default_method);
 
     return 0;
 }
@@ -150,38 +142,13 @@ static pa_source_output *find_source_output_by_flags(pa_source *s) {
     while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
         pa_source_output_assert_ref(o);
 
-        if (CHECK_FLAGS_AEC(o->flags))
+        if (o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL)
             break;
     }
 
     return o ? o : NULL;
 }
 
-static void free_source_output_extra_resource(pa_source_output *o) {
-    pa_assert(o);
-
-    if (o->thread_info.processor) {
-        pa_processor_free(o->thread_info.processor);
-        o->thread_info.processor = NULL;
-    }
-
-    if (o->thread_info.resampler2) {
-        pa_resampler_free(o->thread_info.resampler2);
-        o->thread_info.resampler2 = NULL;
-    }
-
-    if (o->thread_info.echo) {
-        pa_memblockq_free(o->thread_info.echo);
-        o->thread_info.echo = NULL;
-    }
-}
-
-static void free_source_output_extra_resource_by_source(pa_source *s) {
-    pa_assert(s);
-
-    free_source_output_extra_resource(find_source_output_by_flags(s));
-}
-
 static pa_usec_t get_round_trip_latency(struct userdata *u) {
     pa_usec_t sink_latency;
     pa_usec_t source_latency;
@@ -189,65 +156,14 @@ static pa_usec_t get_round_trip_latency(struct userdata *u) {
     pa_assert(u);
     pa_assert(u->sink);
 
-    sink_latency = pa_sink_get_latency(u->sink);
-    source_latency = pa_source_get_latency(u->source);
+    pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_GET_LATENCY, &sink_latency, 0, NULL);
+    pa_asyncmsgq_send(u->source->asyncmsgq, PA_MSGOBJECT(u->source), PA_SOURCE_MESSAGE_GET_LATENCY, &source_latency, 0, NULL);
 
-    pa_log_info("sink latency (%llu), source latency(%llu)", sink_latency, source_latency);
+    pa_log_info("sink latency (%" PRIu64 "), source latency(%" PRIu64 ")", sink_latency, source_latency);
 
     return sink_latency + source_latency;
 }
 
-static int setup_delayq_latency(struct userdata *u, pa_usec_t latency) {
-    int64_t write_index, read_index;
-    size_t bytes, blocksize, n;
-    pa_memchunk silence;
-
-    if (proplist_get_fragment_size(u->sink->proplist, &blocksize))
-        return -1;
-
-    if (u->delayq)
-        pa_memblockq_free(u->delayq);
-
-    u->delayq = pa_memblockq_new("echo reference delay",
-                                    0,
-                                    MEMBLOCKQ_MAXLENGTH,
-                                    0,
-                                    &u->sink->sample_spec,
-                                    0,
-                                    blocksize,
-                                    0,
-                                    NULL);
-
-    bytes = pa_usec_to_bytes(latency, &u->sink->sample_spec);
-    n = (bytes + blocksize - 1) / blocksize;
-
-    pa_silence_memchunk_get(
-            &u->sink->core->silence_cache,
-            u->sink->core->mempool,
-            &silence,
-            &u->sink->sample_spec,
-            blocksize);
-
-    if (!silence.memblock)
-        return -1;
-
-    write_index = pa_memblockq_get_write_index(u->delayq);
-    read_index = pa_memblockq_get_read_index(u->delayq);
-
-    pa_memblockq_flush_write(u->delayq, true);
-    while (n-- > 0)
-        pa_memblockq_push(u->delayq, &silence);
-
-    pa_log_info("push n(%d) blocks. write_index(%llu->%llu), read_index(%llu->%llu)",
-                    pa_memblockq_get_nblocks(u->delayq),
-                    write_index, pa_memblockq_get_write_index(u->delayq),
-                    read_index, pa_memblockq_get_read_index(u->delayq));
-
-    pa_memblock_unref(silence.memblock);
-
-    return 0;
-}
-
 static int send_rebuild_rtpoll(pa_msgobject *dst, pa_msgobject *src, pa_asyncmsgq *q) {
     struct arguments {
         pa_msgobject *o;
@@ -277,48 +193,65 @@ static int send_rebuild_rtpoll(pa_msgobject *dst, pa_msgobject *src, pa_asyncmsg
 }
 
 /* Call from main thread */
-static void set_aec_state(struct userdata *u, bool enable) {
+static void broadcast_echo_cancel_state(struct userdata *u, pa_source_output *o, bool enable) {
     void *v[2];
-    pa_usec_t latency = 0ULL;
 
     pa_assert(u);
     pa_assert(u->source);
-    pa_assert(u->sink); /* not allow sink null */
-
-    pa_log_info("set_aec state %d -> %d", u->enable, enable);
-
-    if (u->enable == enable)
-        return;
+    pa_assert(u->sink);
 
-    latency = enable ? get_round_trip_latency(u) : 0ULL;
+    if (enable) {
+        send_rebuild_rtpoll(PA_MSGOBJECT(u->source), PA_MSGOBJECT(u->echo_cancel), u->asyncmsgq_source);
+        send_rebuild_rtpoll(PA_MSGOBJECT(u->sink), PA_MSGOBJECT(u->echo_cancel), u->asyncmsgq_sink);
+    }
 
     v[0] = (void *)enable;
-    v[1] = &latency;
+    v[1] = o;
 
-    /* There is a race condition between the source thread and the render thread.
-     * pa_source_post function can be overlapped at the same time */
     pa_asyncmsgq_send(u->thread_mq.inq, PA_MSGOBJECT(u->echo_cancel),
                         PA_ECHO_CANCEL_MESSAGE_SET_AEC_STATE, (void *)v, 0, NULL);
 
-    if (u->sink)
-        pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink),
-                            PA_SINK_MESSAGE_SET_AEC_STATE, (void *)enable, 0, NULL, NULL);
+    pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink),
+                        PA_SINK_MESSAGE_SET_AEC_STATE, (void *)enable, 0, NULL, NULL);
 
     pa_asyncmsgq_post(u->source->asyncmsgq, PA_MSGOBJECT(u->source),
                         PA_SOURCE_MESSAGE_SET_AEC_STATE, (void *)enable, 0, NULL, NULL);
 
+    if (!enable) {
+        send_rebuild_rtpoll(PA_MSGOBJECT(u->source), NULL, NULL);
+        send_rebuild_rtpoll(PA_MSGOBJECT(u->sink), NULL, NULL);
+    }
+}
+
+static void set_echo_cancel_state(struct userdata *u, bool enable) {
+    pa_source_output *o;
+
+    pa_assert(u);
+    pa_assert(u->source);
+    pa_assert(u->sink);
+
+    if (u->enable == enable)
+        return;
+
+    o = find_source_output_by_flags(u->source);
+    if (!o) {
+        pa_log_error("Failed to find EC source-output");
+        return;
+    }
+
+    broadcast_echo_cancel_state(u, o, enable);
     u->enable = enable;
 
-    pa_log_info("AEC state updated. enable(%d)", u->enable);
+    pa_log_info("AEC state is changed. enable(%d)", u->enable);
 }
 
 static int update_state_by_sink(struct userdata *u, bool enable) {
     pa_assert(u);
 
-    if (CHECK_COUNT_SOURCE_OUTPUT_AEC(u) == 0)
+    if (u->n_source_output == 0)
         return 0;
 
-    set_aec_state(u, enable);
+    set_echo_cancel_state(u, enable);
 
     return 0;
 }
@@ -329,37 +262,18 @@ static int update_state_by_source(struct userdata *u, bool enable) {
     if (enable) {
         if (u->n_source_output++ == 0) {
             if (!u->sink || PA_SINK_IS_RUNNING(u->sink->state))
-                set_aec_state(u, enable);
+                set_echo_cancel_state(u, enable);
         }
     } else {
         if (--u->n_source_output == 0)
-            set_aec_state(u, enable);
+            set_echo_cancel_state(u, enable);
     }
 
     return 0;
 }
 
-static int unlink_source_output_in_thread(pa_source_output *o) {
-    pa_assert(o);
-
-    free_source_output_extra_resource(o);
-
-    /* source-output should be remove by render thread to prevent race condition.
-     * 1. invoke REMOVE message
-     * 2. receive the message in tizenaudio-source
-     * 3. send the message to tizenaudio-echo-cancel
-     * 4. remove source-output in render thread
-     */
-    pa_source_process_msg(PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, 0, NULL);
-
-    return 0;
-}
-
 static void pa_source_push_echo(pa_source *s, pa_memchunk *chunk) {
     pa_source_output *o = NULL;
-    pa_memchunk ochunk;
-    bool nf = false;
-    int r;
 
     o = find_source_output_by_flags(s);
     if (!o) {
@@ -367,129 +281,19 @@ static void pa_source_push_echo(pa_source *s, pa_memchunk *chunk) {
         return;
     }
 
-    if (o->thread_info.resampler2) {
-        pa_resampler_run(o->thread_info.resampler2, chunk, &ochunk);
-        chunk = &ochunk;
-        nf = true;
-    }
-
-    r = pa_memblockq_push(o->thread_info.echo, chunk);
-    if (r != 0)
-        pa_log_error("Failed to push chunk to memblockq");
-
-    if (nf)
-        pa_memblock_unref(chunk->memblock);
-
-    pa_log_debug("Pushed echo data. index(%u) size(%llums), nblocks(%d) index(%lld:%lld)",
-                    o->index,
-                    pa_bytes_to_usec(chunk->length, &o->sample_spec) / PA_USEC_PER_MSEC,
-                    pa_memblockq_get_nblocks(o->thread_info.echo),
-                    pa_memblockq_get_write_index(o->thread_info.echo),
-                    pa_memblockq_get_read_index(o->thread_info.echo));
-}
-
-static void flush_echo_memblockq(pa_source *s) {
-    pa_source_output *o;
-
-    o = find_source_output_by_flags(s);
-    if (!o) {
-        pa_log_error("Can't find aec source-output");
-        return;
-    }
-
-    pa_memblockq_flush_write(o->thread_info.echo, true);
+    if (pa_processor_push_data(o->thread_info.processor, chunk) < 0)
+        pa_log_error("Failed to push reference data");
 }
 
 static int post_process(pa_source_output *o, pa_memchunk *chunk, pa_memchunk *ochunk) {
     int ret = -1;
-    int8_t *rec, *ref, *out;
-    size_t blocksize;
-
-    pa_memchunk tchunk;
-
-    if (!o->thread_info.processor) {
-        pa_log_error("Failed to get processor");
-        return ret;
-    }
-
-    /*
-     * pre-condition
-     * sink block >= source block >= blocksize * n
-     * if blocksize is not described blocksize, It should be same as source's fragment size
-     *
-     * chunk must be processed every data.
-     */
 
-    /* reference exist */
-    if (o->thread_info.echo) {
-        size_t block_bytes, length, n;
-
-        /* echo queue is not ready that means reference is not started */
-        if (pa_memblockq_is_empty(o->thread_info.echo))
-            return ret;
-
-        blocksize = pa_processor_get_blocksize(o->thread_info.processor);
-        block_bytes = blocksize * pa_frame_size(&o->sample_spec);
-
-        if (chunk->length % block_bytes) {
-            pa_log_warn("Skip to process aec. chunk size must be multiple of blocksize");
-            return -1;
-        }
-
-        n = chunk->length / block_bytes;
-        length = n * block_bytes;
-
-        if (!(ret = pa_memblockq_peek_fixed_size(o->thread_info.echo, length, &tchunk))) {
-            int i;
-
-            ochunk->index = 0;
-            ochunk->length = length;
-            ochunk->memblock = pa_memblock_new(o->core->mempool, length);
-
-            rec = pa_memblock_acquire(chunk->memblock);
-            ref = pa_memblock_acquire(tchunk.memblock);
-            out = pa_memblock_acquire(ochunk->memblock); /* TODO: buffer can be shared rec buffer */
-
-            for (i=0; i<n; i++)
-                pa_processor_process(o->thread_info.processor,
-                                        rec + (i * block_bytes),
-                                        ref + (i * block_bytes),
-                                        out + (i * block_bytes));
-
-            pa_memblock_release(chunk->memblock);
-            pa_memblock_release(tchunk.memblock);
-            pa_memblock_release(ochunk->memblock);
-
-            pa_log_debug("Post-process. i(%u), rec(%llums), ref(%llums) "
-                            "block(%llums), process(%llums) * n(%d) "
-                            "silence(%d), index(%lld:%lld)",
-                            o->index,
-                            pa_bytes_to_usec(chunk->length, &o->sample_spec) / PA_USEC_PER_MSEC,
-                            pa_bytes_to_usec(tchunk.length, &o->sample_spec) / PA_USEC_PER_MSEC,
-                            pa_bytes_to_usec(length, &o->sample_spec) / PA_USEC_PER_MSEC,
-                            pa_bytes_to_usec(block_bytes, &o->sample_spec) / PA_USEC_PER_MSEC,
-                            n,
-                            pa_memblock_is_silence(tchunk.memblock),
-                            pa_memblockq_get_write_index(o->thread_info.echo),
-                            pa_memblockq_get_read_index(o->thread_info.echo));
-
-            pa_memblock_unref(tchunk.memblock);
-            pa_memblockq_drop(o->thread_info.echo, tchunk.length);
-        }
-    } else {
-        /* no reference case like audio_share */
-        rec = pa_memblock_acquire(chunk->memblock);
-
-        ochunk->index = 0;
-        ochunk->length = chunk->length;
-        ochunk->memblock = pa_memblock_new(o->core->mempool, chunk->length);
-        out = pa_memblock_acquire(ochunk->memblock);
-
-        pa_processor_process(o->thread_info.processor, rec, NULL, out);
+    pa_assert(o);
+    pa_assert(chunk);
+    pa_assert(ochunk);
 
-        pa_memblock_release(chunk->memblock);
-        pa_memblock_release(ochunk->memblock);
-    }
+    if ((ret = pa_processor_process(o->thread_info.processor, chunk, ochunk)) < 0)
+        pa_log_error("Failed to process data");
 
     return ret;
 }
@@ -504,86 +308,80 @@ static int process_msg(
 
     struct userdata *u = PA_ECHO_CANCEL(o)->u;
 
-    /* thread that pushes ref data should be called in render thread because of thread safe */
-    switch (code) {
-        case PA_ECHO_CANCEL_MESSAGE_PUSH_DATA: {
-            /* a few pcm data will get lost. */
-            if (!u->enable_in_thread)
-                return 0;
+    /* trigger resolves a race condition related to post_process between source and render thread */
+    if (u->triggered) {
+        if (code == PA_ECHO_CANCEL_MESSAGE_PUSH_DATA || code == PA_ECHO_CANCEL_MESSAGE_PUSH_ECHO) {
+            pa_source_output *o = NULL;
+            pa_usec_t latency;
 
-            pa_source_post(u->source, chunk);
+            o = find_source_output_by_flags(u->source);
+            o->post_process = post_process;
 
-            return 0;
-        }
-        case PA_ECHO_CANCEL_MESSAGE_PUSH_ECHO: {
-            pa_memchunk ochunk;
+            latency = get_round_trip_latency(u);
+
+            if (pa_processor_setup_reference_memblockq_padding(o->thread_info.processor, latency) < 0)
+                pa_log_warn("Failed to setup reference memblockq padding");
 
-            if (!u->enable_in_thread)
-                return 0;
+            u->triggered = false;
 
-            pa_memblockq_push(u->delayq, chunk);
+            pa_log_info("Triggered Echo-Cancellation. index(%d), latency(%" PRIu64 ") usec", o->index, latency);
+        }
+    }
 
-            if (!pa_memblockq_peek(u->delayq, &ochunk)) {
-                pa_source_push_echo(u->source, &ochunk);
+    /* thread that pushes ref data should be called in render thread because of thread safe */
+    switch (code) {
+        case PA_ECHO_CANCEL_MESSAGE_PUSH_DATA:
+            if (u->enable_in_thread)
+                pa_source_post(u->source, chunk);
 
-                pa_memblock_unref(ochunk.memblock);
-                pa_memblockq_drop(u->delayq, ochunk.length);
-            }
+            break;
+        case PA_ECHO_CANCEL_MESSAGE_PUSH_ECHO:
+            if (u->enable_in_thread)
+                pa_source_push_echo(u->source, chunk);
 
-            return 0;
-        }
-        case PA_ECHO_CANCEL_MESSAGE_SET_AEC_STATE : {
+            break;
+        case PA_ECHO_CANCEL_MESSAGE_SET_AEC_STATE: {
             void **v = (void **)data;
-            pa_usec_t latency;
+            bool enable = (bool)v[0];
+            pa_source_output *o = (pa_source_output *)v[1];
 
-            u->enable_in_thread = !!(int)v[0];
-            latency = *(pa_usec_t *)v[1];
+            u->enable_in_thread = enable;
 
-            if (u->enable_in_thread) {
-                if (setup_delayq_latency(u, latency)) {
-                    pa_log_error("Failed to init delayq");
-                    return 0;
-                }
+            if (enable) {
+                u->triggered = true;
             } else {
-                if (u->delayq) {
-                    pa_memblockq_free(u->delayq);
-                    u->delayq = NULL;
-                }
-
-                flush_echo_memblockq(u->source);
+                pa_processor_flush(o->thread_info.processor);
+                o->post_process = NULL;
             }
 
-            pa_log_info("EC state change (%d)", u->enable_in_thread);
-
-            return 0;
+            break;
         }
-        case PA_ECHO_CANCEL_MESSAGE_SOURCE_OUTPUT_UNLINK:
-            unlink_source_output_in_thread((pa_source_output *)data);
-            return 0;
         default:
-            return 0;
+            break;
     }
+
+    return 0;
 }
 
 static pa_hook_result_t source_output_new_cb(pa_core *c, pa_source_output_new_data *data, void *userdata) {
     struct userdata *u = (struct userdata *)userdata;
-    const char *method;
+    pa_processor_method_t method;
 
     pa_assert(c);
     pa_assert(u);
     pa_assert(data);
 
-    method = pa_proplist_gets(data->proplist, PA_PROP_MEDIA_ECHO_CANCEL_METHOD);
-    if (!method)
-        return PA_HOOK_OK;
-
-    if (CHECK_COUNT_SOURCE_OUTPUT_AEC(u) > 0) {
-        pa_log_error("Not allow multi aec instance");
+    if (proplist_get_method(data->proplist, u->default_method, &method) < 0)
         return PA_HOOK_OK;
-    }
 
-    data->flags |= PA_SOURCE_OUTPUT_ECHO_CANCEL;
+    /* TODO: source-output can be moved */
     data->flags |= PA_SOURCE_OUTPUT_DONT_MOVE;
+    data->flags |= PA_SOURCE_OUTPUT_ECHO_CANCEL;
+
+    if (method == PA_PROCESSOR_REFERENCE_COPY)
+        data->flags |= PA_SOURCE_OUTPUT_NO_REMAP;
+
+    pa_log_info("echo-cancel source-output will be created. method(%d)", method);
 
     // TODO:add check limitation VARIOUS_RATE?
     return PA_HOOK_OK;
@@ -613,72 +411,31 @@ static pa_sink *find_reference_sink_by_proplist(pa_core *c, pa_proplist *p) {
     return s;
 }
 
-static int check_latency_validation(struct userdata *u, pa_sink *sink, pa_source *source) {
-    pa_usec_t sink_usec;
-    pa_usec_t source_usec;
-    pa_usec_t block_usec;
-    size_t blocksize;
-
-    if (proplist_get_fragment_size(source->proplist, &blocksize)) {
-        pa_log_debug("Failed to get blocksize from source");
-        return -1;
-    }
-
-    source_usec = pa_bytes_to_usec(blocksize, &source->sample_spec);
-    block_usec = u->blocksize ? pa_bytes_to_usec(u->blocksize, &source->sample_spec) : source_usec;
-
-    /*
-     * limitation
-     * sink block >= source block >= blocksize * n
-     */
-    if (source_usec < block_usec) {
-        pa_log_debug("Need to check period size. source >= block * n. "
-                    "source(%llums), block_usec(%llums)",
-                        source_usec / PA_USEC_PER_MSEC,
-                        block_usec / PA_USEC_PER_MSEC);
-        return -1;
-    }
-
-    if (!u->sink)
-        return 0;
-
-    if (proplist_get_fragment_size(u->sink->proplist, &blocksize)) {
-        pa_log_debug("Failed to get blocksize from sink");
-        return -1;
-    }
-
-    sink_usec = pa_bytes_to_usec(blocksize, &u->sink->sample_spec);
-
-    if (source_usec > sink_usec || sink_usec < block_usec) {
-        pa_log_debug("Need to check period size. sink >= source >= block * n. "
-                    "source(%llums) sink(%llums) block_usec(%llums)",
-                        source_usec / PA_USEC_PER_MSEC,
-                        sink_usec / PA_USEC_PER_MSEC,
-                        block_usec / PA_USEC_PER_MSEC);
-        return -1;
-    }
-
-    return 0;
-}
-
 static pa_hook_result_t source_output_put_cb(pa_core *c, pa_source_output *o, void *userdata) {
     struct userdata *u = (struct userdata *)userdata;
-    size_t blocksize = u->blocksize;
-    pa_processor_algo_t method;
+    pa_processor_method_t method;
+    pa_usec_t process_usec;
+    pa_usec_t sink_process_usec;
+    int r;
 
     pa_assert(c);
     pa_assert(o);
     pa_assert(u);
     pa_assert(o->source);
 
-    if (!CHECK_FLAGS_AEC(o->flags))
+    if (!(o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL))
         return PA_HOOK_OK;
 
-    if (CHECK_COUNT_SOURCE_OUTPUT_AEC(u) > 0) {
+    if (u->n_source_output > 0) {
         pa_log_error("Not allow multi aec instance");
         goto fail;
     }
 
+    if (proplist_get_method(o->proplist, u->default_method, &method) < 0) {
+        pa_log_error("Failed to get method");
+        goto fail;
+    }
+
     u->source = o->source;
     u->sink = find_reference_sink_by_proplist(c, o->proplist);
     if (!u->sink) {
@@ -686,83 +443,46 @@ static pa_hook_result_t source_output_put_cb(pa_core *c, pa_source_output *o, vo
         goto fail;
     }
 
-    if (check_latency_validation(u, u->sink, u->source)) {
-        pa_log_error("Failed to check latency validation");
+    /* Get period size of sink and source */
+    if (proplist_get_fragment_size_usec(u->source->proplist, &u->source->sample_spec, &process_usec) < 0) {
+        pa_log_error("Failed to get fragment usec");
         goto fail;
     }
 
-    /* Use the sources fragment size if blocksize is not specified */
-    if (!blocksize) {
-        if (proplist_get_fragment_size(u->source->proplist, &blocksize)) {
-            pa_log_error("Failed to get blocksize");
-            goto fail;
-        }
+    if (proplist_get_fragment_size_usec(u->sink->proplist, &u->sink->sample_spec, &sink_process_usec) < 0) {
+        pa_log_error("Failed to get fragment usec");
+        goto fail;
     }
 
-    if (o->thread_info.resampler)
-        blocksize = pa_resampler_result(o->thread_info.resampler, blocksize);
-
-    if (pa_processor_get_method(o, u->default_method, &method)) {
-        pa_log_error("Can't find method");
+    if (sink_process_usec < process_usec) {
+        pa_log_error("sink process usec should be bigger than source");
         goto fail;
     }
 
-    o->thread_info.processor = pa_processor_new(blocksize / pa_frame_size(&o->sample_spec),
+    o->thread_info.processor = pa_processor_new(c, process_usec / PA_USEC_PER_MSEC,
                                                 &o->sample_spec,
-                                                method, PA_PROCESSOR_FLAGS_ECHO_CANCEL);
+                                                &o->channel_map,
+                                                &u->source->sample_spec,
+                                                method);
     if (!o->thread_info.processor) {
         pa_log_error("Failed to create pa_processor. echo-cancellation will be disabled");
         goto fail;
     }
 
-    if (u->sink) {
-        if (proplist_get_fragment_size(u->sink->proplist, &blocksize)) {
-            pa_log_error("Failed to get blocksize");
-            goto fail;
-        }
-
-        if (!pa_sample_spec_equal(&u->sink->sample_spec, &o->sample_spec)) {
-            pa_resampler *resampler2;
-
-            resampler2 = pa_resampler_new(
-                    c->mempool,
-                    &u->sink->sample_spec, &u->sink->channel_map,
-                    &o->sample_spec, &o->channel_map,
-                    c->lfe_crossover_freq,
-                    c->resample_method, 0);
-
-            if (!resampler2) {
-                pa_log_error("Failed to allocate resampler2 for echo-cancel");
-                goto fail;
-            }
-
-            o->thread_info.resampler2 = resampler2;
-            blocksize = pa_resampler_result(o->thread_info.resampler2, blocksize);
-
-            pa_log_info("Use resampler2. blocksize(%d) bytes", blocksize);
-        }
-
-        o->thread_info.echo = pa_memblockq_new("echo reference",
-                                                0,
-                                                MEMBLOCKQ_MAXLENGTH,
-                                                0,
-                                                &o->sample_spec,
-                                                0,
-                                                blocksize,
-                                                0,
-                                                &o->source->silence);
-
-        if (!o->thread_info.echo) {
-            pa_log_error("Failed to alloc memblockq");
-            goto fail;
-        }
+    r = pa_processor_bind_reference(o->thread_info.processor,
+                                    sink_process_usec / PA_USEC_PER_MSEC,
+                                    &u->sink->sample_spec,
+                                    &u->sink->channel_map);
+    if (r < 0) {
+        pa_log_error("Failed to bind reference source");
+        goto fail;
     }
 
-    o->post_process = post_process;
-
-    /* connect to sink and source */
-    send_rebuild_rtpoll(PA_MSGOBJECT(u->source), PA_MSGOBJECT(u->echo_cancel), u->asyncmsgq_source);
-    send_rebuild_rtpoll(PA_MSGOBJECT(u->sink), PA_MSGOBJECT(u->echo_cancel), u->asyncmsgq_sink);
+    pa_log_info("echo-cancel source-output(%u) created. process_msec(%u), sink_process_msec(%u), method(%s)",
+                        o->index,
+                        (uint32_t)(process_usec / PA_USEC_PER_MSEC),
+                        (uint32_t)(sink_process_usec / PA_USEC_PER_MSEC),
+                        pa_processor_method_to_string(method));
 
     update_state_by_source(u, true);
 
@@ -770,30 +490,45 @@ static pa_hook_result_t source_output_put_cb(pa_core *c, pa_source_output *o, vo
 
 fail:
     o->flags &= ~PA_SOURCE_OUTPUT_ECHO_CANCEL; // TODO: need to consider DONT_MOVE define
-    free_source_output_extra_resource(o);
+    if (o->thread_info.processor)
+        pa_processor_free(o->thread_info.processor);
 
     return PA_HOOK_OK;
 }
 
 /* Call from main thread */
-static pa_hook_result_t source_output_unlink_post_cb(pa_core *c, pa_source_output *o, void *userdata) {
+static pa_hook_result_t source_output_unlink_cb(pa_core *c, pa_source_output *o, void *userdata) {
     struct userdata *u = (struct userdata *)userdata;
 
     pa_assert(c);
     pa_assert(o);
     pa_assert(u);
 
-    if (!CHECK_FLAGS_AEC(o->flags))
+    if (!(o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL))
         return PA_HOOK_OK;
 
     update_state_by_source(u, false);
 
-    send_rebuild_rtpoll(PA_MSGOBJECT(u->source), NULL, NULL);
-    send_rebuild_rtpoll(PA_MSGOBJECT(u->sink), NULL, NULL);
+    return PA_HOOK_OK;
+}
+
+static pa_hook_result_t source_output_unlink_post_cb(pa_core *c, pa_source_output *o, void *userdata) {
+    struct userdata *u = (struct userdata *)userdata;
+
+    pa_assert(c);
+    pa_assert(o);
+    pa_assert(u);
+
+    if (!(o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL))
+        return PA_HOOK_OK;
+
+    pa_processor_free(o->thread_info.processor);
 
     u->source = NULL;
     u->sink = NULL;
 
+    pa_log_info("echo-cancel source-output(%u) is unlinked", o->index);
+
     return PA_HOOK_OK;
 }
 
@@ -877,7 +612,6 @@ fail:
 int pa__init(pa_module *m) {
     pa_modargs *ma = NULL;
     struct userdata *u = NULL;
-    uint32_t blocksize = 0;
 
     pa_assert(m);
 
@@ -886,15 +620,9 @@ int pa__init(pa_module *m) {
         return -1;
     }
 
-    if (pa_modargs_get_value_u32(ma, "blocksize", &blocksize) < 0) {
-        pa_log_info("Failed to get blocksize");
-        goto fail;
-    }
-
     m->userdata = u = pa_xnew0(struct userdata, 1);
     u->core = m->core;
     u->m = m;
-    u->blocksize = blocksize;
     u->default_method = pa_xstrdup(pa_modargs_get_value(ma, "method", DEFAULT_AEC_METHOD));
 
     u->echo_cancel = pa_msgobject_new(pa_echo_cancel);
@@ -925,6 +653,10 @@ int pa__init(pa_module *m) {
         pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_PUT],
                     PA_HOOK_EARLY, (pa_hook_cb_t) source_output_put_cb, u);
 
+    u->source_output_unlink_slot =
+        pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_UNLINK],
+                    PA_HOOK_EARLY, (pa_hook_cb_t) source_output_unlink_cb, u);
+
     u->source_output_unlink_post_slot =
         pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_UNLINK_POST],
                     PA_HOOK_EARLY, (pa_hook_cb_t) source_output_unlink_post_cb, u);
@@ -989,19 +721,20 @@ void pa__done(pa_module *m) {
 
     if (u->asyncmsgq_source) {
         if (u->source) {
+            pa_source_output *o;
+
             pa_asyncmsgq_post(u->source->asyncmsgq, PA_MSGOBJECT(u->source),
                     PA_SOURCE_MESSAGE_SET_AEC_STATE, (void *)false, 0, NULL, NULL);
 
             send_rebuild_rtpoll(PA_MSGOBJECT(u->source), NULL, NULL);
-            free_source_output_extra_resource_by_source(u->source);
+
+            if ((o = find_source_output_by_flags(u->source)))
+                pa_processor_free(o->thread_info.processor);
         }
 
         pa_asyncmsgq_unref(u->asyncmsgq_source);
     }
 
-    if (u->delayq)
-        pa_memblockq_free(u->delayq);
-
     if (u->rtpoll)
         pa_rtpoll_free(u->rtpoll);
 
index 3681347..a97b99c 100644 (file)
 #endif
 
 #include <pulse/xmalloc.h>
+#include <pulse/timeval.h>
+#include <pulse/channelmap.h>
 #include <pulsecore/log.h>
 #include <pulsecore/macro.h>
+#include <pulsecore/resampler.h>
+#include <pulsecore/core-util.h>
 
 #include "processor.h"
 
+//#define __DEBUG__
 #ifdef __DEBUG__
 #include <stdio.h>
 #include <sys/time.h>
 #include <unistd.h>
 #endif
 
-struct pa_processor_algo {
+#define MEMBLOCKQ_MAXLENGTH (16 * 1024 * 1024)
+
+typedef struct pa_processor_method_interface pa_processor_method_interface;
+struct pa_processor {
+    pa_core *core;
+    pa_processor_method_interface *intf;
+    pa_processor_method_t method;
+    void *priv;
+
+    size_t process_frames;
+    pa_usec_t process_usec;
+    pa_usec_t reference_process_usec;
+    size_t process_bytes;
+    size_t reference_process_bytes;
+
+    pa_sample_spec *output_ss;
+    pa_channel_map *output_chmap;
+    pa_sample_spec *source_ss;
+    pa_sample_spec reference_memblockq_ss;
+
+    pa_resampler *resampler;
+    pa_memblockq *reference_memblockq;
+    pa_memblockq *output_memblockq;
+
+#ifdef __DEBUG__
+    int fdrec, fdref, fdout;
+    struct timeval before, after;
+#endif
+};
+
+struct pa_processor_method_interface {
+    const char *name;
     void *(*create)(size_t nframes, pa_sample_spec *ss);
     int32_t (*process)(void *priv, int8_t *rec, int8_t *ref, int8_t *out);
     int32_t (*destroy)(void *priv);
+    int32_t (*change_reference_spec)(void *priv, pa_sample_spec *source_ss, pa_sample_spec *sample_spec, pa_channel_map *map);
 };
 
 extern void *adrian_create(size_t nframes, pa_sample_spec *ss);
@@ -51,145 +88,463 @@ extern void *speex_create(size_t nframes, pa_sample_spec *ss);
 extern int32_t speex_process(void *priv, int8_t *rec, int8_t *ref, int8_t *out);
 extern int32_t speex_destroy(void *priv);
 
+#ifdef SUPPORT_METHOD_WEBRTC
 extern void *webrtc_audio_create(size_t nframes, pa_sample_spec *ss);
 extern int32_t webrtc_audio_process(void *priv, int8_t *rec, int8_t *ref, int8_t *out);
 extern int32_t webrtc_audio_destroy(void *priv);
+#endif
 
-pa_processor *pa_processor_new(size_t nframes, pa_sample_spec *ss, pa_processor_algo_t backend, pa_process_flags_t flags) {
-    pa_processor *processor = NULL;
+extern void *reference_copy_create(size_t nframes, pa_sample_spec *ss);
+extern int32_t reference_copy_process(void *priv, int8_t *rec, int8_t *ref, int8_t *out);
+extern int32_t reference_copy_destroy(void *priv);
+extern int32_t reference_copy_change_reference_spec(void *priv, pa_sample_spec *source_ss, pa_sample_spec *sample_spec, pa_channel_map *map);
 
-    pa_assert(ss);
+#ifdef __DEBUG__
+static void debug_open_file(pa_processor *processor);
+static void debug_timestamp_begin(pa_processor *processor);
+static void debug_timestamp_end(pa_processor *processor);
+static void debug_write_file(pa_processor *processor, int8_t *rec, int8_t *ref, int8_t *out);
+static void debug_close_file(pa_processor *processor);
+#else
+#define debug_open_file(x)
+#define debug_timestamp_begin(x)
+#define debug_timestamp_end(x)
+#define debug_write_file(x, a, b, c)
+#define debug_close_file(x)
+#endif
 
-    if (ss->format != PA_SAMPLE_S16LE) {
-        pa_log_error("Not supported format(%d)", ss->format);
+static struct pa_processor_method_interface method_table[PA_PROCESSOR_METHOD_MAX] = {
+    {
+        "speex",
+        speex_create,
+        speex_process,
+        speex_destroy,
+        NULL,
+    },
+    {
+        "adrian",
+        adrian_create,
+        adrian_process,
+        adrian_destroy,
+        NULL,
+    },
+#ifdef SUPPORT_METHOD_WEBRTC
+    {
+        "webrtc",
+        webrtc_audio_create,
+        webrtc_audio_process,
+        webrtc_audio_destroy,
+        NULL,
+    },
+#endif
+    {
+        "reference_copy",
+        reference_copy_create,
+        reference_copy_process,
+        reference_copy_destroy,
+        reference_copy_change_reference_spec,
+    },
+};
+
+static size_t pa_processor_usec_to_frame(pa_usec_t usec, pa_sample_spec *spec) {
+    pa_assert(spec);
+
+    return pa_usec_to_bytes(usec, spec) / pa_frame_size(spec);
+}
+
+pa_processor *pa_processor_new(pa_core *core,
+                                uint32_t process_msec,
+                                pa_sample_spec *output_ss,
+                                pa_channel_map *output_map,
+                                pa_sample_spec *source_ss,
+                                pa_processor_method_t method) {
+    pa_processor *processor;
+    pa_memchunk silence;
+
+    pa_assert(core);
+    pa_assert(output_ss);
+    pa_assert(output_map);
+    pa_assert(source_ss);
+    pa_assert(method < PA_PROCESSOR_METHOD_MAX);
+
+    processor = pa_xnew0(pa_processor, 1);
+    processor->intf = &method_table[method];
+    processor->core = core;
+    processor->process_usec = process_msec * PA_USEC_PER_MSEC;
+    processor->output_ss = output_ss;
+    processor->output_chmap = output_map;
+    processor->source_ss = source_ss;
+    processor->method = method;
+    processor->process_frames = pa_processor_usec_to_frame(processor->process_usec, processor->output_ss);
+    processor->process_bytes = pa_usec_to_bytes(processor->process_usec, processor->output_ss);
+
+    if (!(processor->priv = processor->intf->create(processor->process_frames, output_ss))) {
+        pa_log_error("Failed to create processor. rate(%d), channels(%d).", output_ss->rate, output_ss->channels);
+        pa_xfree(processor);
         return NULL;
     }
 
-    processor = pa_xnew0(pa_processor, 1);
-    processor->intf = pa_xnew0(pa_processor_algo, 1);
-    processor->nframes = nframes;
-    processor->framesize = pa_frame_size(ss);
-
-    switch (backend) {
-        case PA_PROCESSOR_SPEEX:
-            processor->intf->create = speex_create;
-            processor->intf->process = speex_process;
-            processor->intf->destroy = speex_destroy;
-            break;
-        case PA_PROCESSOR_ADRIAN:
-            processor->intf->create = adrian_create;
-            processor->intf->process = adrian_process;
-            processor->intf->destroy = adrian_destroy;
-            break;
-        case PA_PROCESSOR_WEBRTC:
-            processor->intf->create = webrtc_audio_create;
-            processor->intf->process = webrtc_audio_process;
-            processor->intf->destroy = webrtc_audio_destroy;
-            break;
-        default:
-            pa_log_error("Invalid backend(%d)", backend);
-            goto fail;
+    pa_silence_memchunk_get(&core->silence_cache, core->mempool, &silence, output_ss, 0);
+    processor->output_memblockq = pa_memblockq_new("source-output memblockq",
+                                                    0,
+                                                    MEMBLOCKQ_MAXLENGTH,
+                                                    0,
+                                                    processor->output_ss,
+                                                    0,
+                                                    pa_usec_to_bytes(processor->process_usec, output_ss),
+                                                    0,
+                                                    &silence);
+    pa_memblock_unref(silence.memblock);
+
+    pa_log_info("Created processor. memblockq rate(%d), channels(%d), process_msec(%u), "
+                "process_bytes(%zu), method(%s), source rate(%d), channels(%d)",
+                                                    output_ss->rate,
+                                                    output_ss->channels,
+                                                    process_msec,
+                                                    pa_usec_to_bytes(processor->process_usec, output_ss),
+                                                    method_table[method].name,
+                                                    source_ss->rate,
+                                                    source_ss->channels);
+
+    debug_open_file(processor);
+
+    return processor;
+}
+
+int pa_processor_bind_reference(pa_processor *processor,
+                                uint32_t process_msec,
+                                pa_sample_spec *reference_ss,
+                                pa_channel_map *reference_chmap) {
+
+    pa_sample_spec sample_spec;
+    pa_channel_map channel_map;
+    pa_memchunk silence;
+
+    pa_assert(processor);
+    pa_assert(processor->intf);
+    pa_assert(processor->output_ss);
+    pa_assert(processor->output_chmap);
+    pa_assert(reference_ss);
+    pa_assert(reference_chmap);
+
+    processor->reference_process_usec = process_msec * PA_USEC_PER_MSEC;
+    if (processor->reference_process_usec < processor->process_usec) {
+        pa_log_error("Failed to bind reference. ref_usec(%" PRId64 "), process_usec(%" PRId64 ")",
+                        processor->reference_process_usec, processor->process_usec);
+        return -1;
     }
 
-    pa_log_info("Use backend(%d) nframes(%zu) framesize(%d)",
-                    backend, processor->nframes, processor->framesize);
+    sample_spec = *processor->output_ss;
+    channel_map = *processor->output_chmap;
 
-    if (!(processor->priv = processor->intf->create(nframes, ss))) {
-        pa_log_error("Failed to create processor");
-        goto fail;
+    /* select reference memblockq sample_spec and channelmap */
+    if (processor->intf->change_reference_spec) {
+        if (processor->intf->change_reference_spec(processor->priv, processor->source_ss, &sample_spec, &channel_map) < 0) {
+            pa_log_error("Failed to get reference info");
+            return -1;
+        }
     }
 
-#ifdef __DEBUG__
-    {
-        static int n = 1;
-        char rec[32], ref[32], out[32];
+    /* Create resampler */
+    if (!pa_sample_spec_equal(reference_ss, &sample_spec)) {
+        if (processor->resampler)
+            pa_resampler_free(processor->resampler);
+
+        processor->resampler = pa_resampler_new(processor->core->mempool,
+                                                reference_ss, reference_chmap,
+                                                &sample_spec, &channel_map,
+                                                processor->core->lfe_crossover_freq,
+                                                processor->core->resample_method, 0);
+        if (!processor->resampler) {
+            pa_log_error("Failed to allocate reference resampler");
+            return -1;
+        }
+    }
 
-        snprintf(rec, sizeof(rec), "/tmp/rec-%d.raw", n);
-        snprintf(ref, sizeof(ref), "/tmp/ref-%d.raw", n);
-        snprintf(out, sizeof(out), "/tmp/out-%d.raw", n);
-        n += 1;
+    processor->reference_memblockq_ss = sample_spec;
+    processor->reference_process_bytes = pa_usec_to_bytes(processor->reference_process_usec, &processor->reference_memblockq_ss);
 
-        unlink(rec);
-        unlink(ref);
-        unlink(out);
+    /* Create memblockq */
+    pa_silence_memchunk_get(&processor->core->silence_cache, processor->core->mempool, &silence, &sample_spec, 0);
 
-        processor->fdrec = open(rec, O_RDWR | O_CREAT | O_TRUNC, 777);
-        processor->fdref = open(ref, O_RDWR | O_CREAT | O_TRUNC, 777);
-        processor->fdout = open(out, O_RDWR | O_CREAT | O_TRUNC, 777);
-    }
-#endif
+    if (processor->reference_memblockq)
+        pa_memblockq_free(processor->reference_memblockq);
 
-    return processor;
+    processor->reference_memblockq = pa_memblockq_new("reference memblockq",
+                                                        0,
+                                                        MEMBLOCKQ_MAXLENGTH,
+                                                        0,
+                                                        &processor->reference_memblockq_ss,
+                                                        0,
+                                                        processor->reference_process_bytes,
+                                                        0,
+                                                        &silence);
+    pa_memblock_unref(silence.memblock);
 
-fail:
-    pa_xfree(processor->intf);
-    pa_xfree(processor);
+    pa_log_debug("Created reference memblockq rate(%d), channels(%d), msec(%u), bytes(%zu)",
+                                                        sample_spec.rate,
+                                                        sample_spec.channels,
+                                                        process_msec,
+                                                        processor->reference_process_bytes);
 
-    return NULL;
+    return 0;
 }
 
-int pa_processor_process(pa_processor *processor, int8_t *rec, int8_t *ref, int8_t *out) {
-    int ret = -1;
+int pa_processor_setup_reference_memblockq_padding(pa_processor *processor, pa_usec_t latency) {
+    pa_memchunk silence;
+    int64_t write_index, read_index;
+    size_t n, bytes;
 
     pa_assert(processor);
+
+    bytes = pa_usec_to_bytes(latency, &processor->reference_memblockq_ss);
+    n = (bytes + (processor->reference_process_bytes - 1)) / processor->reference_process_bytes;
+
+    pa_silence_memchunk_get(
+            &processor->core->silence_cache,
+            processor->core->mempool,
+            &silence,
+            &processor->reference_memblockq_ss,
+            processor->reference_process_bytes);
+
+    write_index = pa_memblockq_get_write_index(processor->reference_memblockq);
+    read_index = pa_memblockq_get_read_index(processor->reference_memblockq);
+
+    for (; n > 0; n--)
+        pa_memblockq_push(processor->reference_memblockq, &silence);
+
+    pa_memblock_unref(silence.memblock);
+
+    pa_log_info("push n(%u) silence blocks. ref_process_bytes(%zu) "
+                "write_index(%" PRId64 "->%" PRId64 "), read_index(%" PRId64 "->%" PRId64 ")",
+                        pa_memblockq_get_nblocks(processor->reference_memblockq),
+                        processor->reference_process_bytes,
+                        write_index, pa_memblockq_get_write_index(processor->reference_memblockq),
+                        read_index, pa_memblockq_get_read_index(processor->reference_memblockq));
+
+    return 0;
+}
+
+// TODO naming
+int pa_processor_process(pa_processor *processor, pa_memchunk *chunk, pa_memchunk *ochunk) {
+    int r = -1;
+    int8_t *recording = NULL;
+    int8_t *reference = NULL;
+    int8_t *output = NULL;
+    bool silence = false;
+
+    pa_memchunk ichunk, rchunk;
+
+    pa_assert(processor);
+    pa_assert(processor->output_memblockq);
+    pa_assert(processor->reference_memblockq);
     pa_assert(processor->intf);
-    pa_assert(rec);
-    pa_assert(out);
+    pa_assert(processor->process_bytes > 0ULL);
+    pa_assert(processor->reference_process_bytes > 0ULL);
+    pa_assert(chunk);
+    pa_assert(ochunk);
+
+    if ((r = pa_memblockq_push(processor->output_memblockq, chunk)) < 0) {
+        pa_log_error("Failed to push chunk to echo memblockq");
+        return r;
+    }
 
-#ifdef __DEBUG__
-    if (write(processor->fdrec, rec, processor->nframes * processor->framesize) <= 0)
-        pa_log_error("Failed to write rec buffer");
+    if ((r = pa_memblockq_peek_fixed_size(processor->reference_memblockq, processor->reference_process_bytes, &rchunk)) < 0) {
+        pa_log_error("Failed to get memblock from reference memblockq");
+        return r;
+    }
+    silence = pa_memblock_is_silence(rchunk.memblock);
 
-    if (write(processor->fdref, ref, processor->nframes * processor->framesize) <= 0)
-        pa_log_error("Failed to write ref buffer");
+    if ((r = pa_memblockq_peek_fixed_size(processor->output_memblockq, processor->process_bytes, &ichunk)) < 0) {
+        pa_log_error("Failed to get memblock from output memblockq");
+        return r;
+    }
 
-    gettimeofday(&processor->before, NULL);
-#endif
+    ochunk->index = 0;
+    ochunk->length = ichunk.length;
+    ochunk->memblock = pa_memblock_new(processor->core->mempool, ochunk->length);
 
-    if (processor->intf->process)
-        ret = processor->intf->process(processor->priv, rec, ref, out);
+    recording = pa_memblock_acquire(ichunk.memblock);
+    reference = pa_memblock_acquire(rchunk.memblock);
+    output = pa_memblock_acquire(ochunk->memblock);
 
-#ifdef __DEBUG__
-    if (write(processor->fdout, out, processor->nframes * processor->framesize) <= 0)
-        pa_log_error("Failed to write out buffer");
+    debug_timestamp_begin(processor);
 
-    gettimeofday(&processor->after, NULL);
+    r = processor->intf->process(processor->priv, recording, reference, output);
 
-    pa_log_debug("It takes time(%ld) bytes(%d)",
-                    1000 * (after.tv_sec-before.tv_sec) + (after.tv_usec-before.tv_usec) / 1000,
-                    processor->buffer_size);
-#endif
+    debug_timestamp_end(processor);
+    debug_write_file(processor, recording, reference, output);
+
+    pa_memblock_release(ichunk.memblock);
+    pa_memblock_release(rchunk.memblock);
+    pa_memblock_release(ochunk->memblock);
 
-    return ret;
+    pa_memblock_unref(rchunk.memblock);
+    pa_memblockq_drop(processor->reference_memblockq, rchunk.length);
+
+    pa_memblock_unref(ichunk.memblock);
+    pa_memblockq_drop(processor->output_memblockq, ichunk.length);
+
+    pa_log_debug("Post-process. rec(%" PRIu64 "ms), ref(%" PRIu64 "msms) out(%" PRIu64 "ms), "
+                "silence(%d), windex:rindex(%" PRId64 ":%" PRId64 ")",
+                        pa_bytes_to_usec(ichunk.length, processor->output_ss) / PA_USEC_PER_MSEC,
+                        pa_bytes_to_usec(rchunk.length, &processor->reference_memblockq_ss) / PA_USEC_PER_MSEC,
+                        pa_bytes_to_usec(ochunk->length, processor->output_ss) / PA_USEC_PER_MSEC,
+                        silence,
+                        pa_memblockq_get_write_index(processor->reference_memblockq),
+                        pa_memblockq_get_read_index(processor->reference_memblockq));
+
+    return r;
+}
+
+int pa_processor_push_data(pa_processor *processor, pa_memchunk *chunk) {
+    pa_memchunk ochunk;
+    int r;
+
+    pa_assert(processor);
+    pa_assert(chunk);
+
+    if (processor->resampler) {
+        pa_resampler_run(processor->resampler, chunk, &ochunk);
+        chunk = &ochunk;
+    }
+
+    if ((r = pa_memblockq_push(processor->reference_memblockq, chunk)) < 0)
+        pa_log_error("Failed to push chunk to echo memblockq");
+
+    if (processor->resampler)
+        pa_memblock_unref(chunk->memblock);
+
+    pa_log_debug("Pushed echo data. bytes(%zu), msec(%" PRIu64 "ms), nblocks(%d) index(%" PRId64 ":%" PRId64 ")",
+                    chunk->length,
+                    pa_bytes_to_usec(chunk->length, &processor->reference_memblockq_ss) / PA_USEC_PER_MSEC,
+                    pa_memblockq_get_nblocks(processor->reference_memblockq),
+                    pa_memblockq_get_write_index(processor->reference_memblockq),
+                    pa_memblockq_get_read_index(processor->reference_memblockq));
+
+    return r;
+}
+
+void pa_processor_flush(pa_processor *processor) {
+    pa_assert(processor);
+
+    if (processor->reference_memblockq)
+        pa_memblockq_flush_read(processor->reference_memblockq);
+
+    if (processor->output_memblockq)
+        pa_memblockq_flush_read(processor->output_memblockq);
 }
 
 int pa_processor_free(pa_processor *processor) {
     pa_assert(processor);
+    pa_assert(processor->priv);
     pa_assert(processor->intf);
-    pa_assert(processor->intf->destroy);
-
-#ifdef __DEBUG__
-    if (processor->fdrec)
-        close(processor->fdrec);
-    if (processor->fdref)
-        close(processor->fdref);
-    if (processor->fdout)
-        close(processor->fdout);
-#endif
 
-    if (processor->intf->destroy(processor->priv)) {
+    if (processor->intf->destroy(processor->priv) < 0)
         pa_log_error("Failed to destroy processor");
-        return -1;
-    }
 
-    pa_xfree(processor->intf);
+    if (processor->resampler)
+        pa_resampler_free(processor->resampler);
+
+    if (processor->reference_memblockq)
+        pa_memblockq_free(processor->reference_memblockq);
+
+    if (processor->output_memblockq)
+        pa_memblockq_free(processor->output_memblockq);
+
+    debug_close_file(processor);
+
     pa_xfree(processor);
 
     return 0;
 }
 
-size_t pa_processor_get_blocksize(pa_processor *processor) {
-    pa_assert(processor);
+pa_processor_method_t pa_processor_get_method(const char *request_method, const char *default_method) {
+    const char *selected;
+
+    if (!request_method || !default_method)
+        return PA_PROCESSOR_SPEEX;
+
+    selected = pa_streq(request_method, "default") ? default_method : request_method;
 
-    return processor->nframes;
+    if (pa_streq(selected, "speex"))
+        return PA_PROCESSOR_SPEEX;
+    else if (pa_streq(selected, "adrian"))
+        return PA_PROCESSOR_ADRIAN;
+#ifdef SUPPORT_METHOD_WEBRTC
+    else if (pa_streq(selected, "webrtc"))
+        return PA_PROCESSOR_WEBRTC;
+#endif
+    else if (pa_streq(selected, "reference_copy"))
+        return PA_PROCESSOR_REFERENCE_COPY;
+    else
+        return PA_PROCESSOR_SPEEX;
+}
+
+const char *pa_processor_method_to_string(pa_processor_method_t method) {
+    if (method >= PA_PROCESSOR_METHOD_MAX)
+        return NULL;
+
+    return method_table[method].name;
+}
+
+#ifdef __DEBUG__
+static void debug_open_file(pa_processor *processor) {
+    static int n = 1;
+    char rec[32], ref[32], out[32];
+
+    snprintf(rec, sizeof(rec), "/tmp/rec-%d.raw", n);
+    snprintf(ref, sizeof(ref), "/tmp/ref-%d.raw", n);
+    snprintf(out, sizeof(out), "/tmp/out-%d.raw", n);
+    n += 1;
+
+    unlink(rec);
+    unlink(ref);
+    unlink(out);
+
+    processor->fdrec = open(rec, O_RDWR | O_CREAT | O_TRUNC, 777);
+    processor->fdref = open(ref, O_RDWR | O_CREAT | O_TRUNC, 777);
+    processor->fdout = open(out, O_RDWR | O_CREAT | O_TRUNC, 777);
+}
+
+static void debug_timestamp_begin(pa_processor *processor) {
+    gettimeofday(&processor->before, NULL);
+}
+
+static void debug_timestamp_end(pa_processor *processor) {
+    gettimeofday(&processor->after, NULL);
+
+    pa_log_debug("It takes time (%ld)ms.",
+                    1000 * (processor->after.tv_sec - processor->before.tv_sec)
+                    + (processor->after.tv_usec - processor->before.tv_usec) / 1000);
+}
+
+static void debug_write_file(pa_processor *processor, int8_t *rec, int8_t *ref, int8_t *out) {
+    if (rec && write(processor->fdrec, rec, processor->process_bytes) <= 0)
+        pa_log_error("Failed to write recording buffer");
+
+    if (ref && write(processor->fdref, ref, processor->reference_process_bytes) <= 0)
+        pa_log_error("Failed to write reference buffer");
+
+    if (out && write(processor->fdout, out, processor->process_bytes) <= 0)
+        pa_log_error("Failed to write ref buffer");
+}
+
+static void debug_close_file(pa_processor *processor) {
+    if (processor->fdrec) {
+        close(processor->fdrec);
+        processor->fdrec = -1;
+    }
+
+    if (processor->fdref) {
+        close(processor->fdref);
+        processor->fdref = -1;
+    }
+
+    if (processor->fdout) {
+        close(processor->fdout);
+        processor->fdout = -1;
+    }
 }
+#endif
+
index fa642ce..fa77e87 100644 (file)
@@ -1,6 +1,3 @@
-#ifndef foopulseprocessorfoo
-#define foopulseprocessorfoo
-
 /***
   This file is part of PulseAudio.
 
   USA.
 ***/
 
+#ifndef foopulseprocessorfoo
+#define foopulseprocessorfoo
+
 #ifdef HAVE_CONFIG_H
 #include <config.h>
 #endif
 
+#include <pulsecore/core.h>
+#include <pulsecore/memblock.h>
+#include <pulse/channelmap.h>
 #include <pulse/sample.h>
 
-//#define __DEBUG__
-
-typedef enum {
-    PA_PROCESSOR_FLAGS_ECHO_CANCEL,
-    PA_PROCESSOR_FLAGS_NOISE_SUPPRESSION,
-} pa_process_flags_t;
-
 typedef enum {
     PA_PROCESSOR_SPEEX,
     PA_PROCESSOR_ADRIAN,
+#ifdef SUPPORT_METHOD_WEBRTC
     PA_PROCESSOR_WEBRTC,
-} pa_processor_algo_t;
+#endif
+    PA_PROCESSOR_REFERENCE_COPY,
+    PA_PROCESSOR_METHOD_MAX,
+} pa_processor_method_t;
 
-typedef struct pa_processor_algo pa_processor_algo;
 typedef struct pa_processor pa_processor;
 
-struct pa_processor {
-    pa_processor_algo *intf;
-    void *priv;
-    void *userdata;
-    size_t nframes;
-    size_t framesize;
-
-#ifdef __DEBUG__
-    int fdrec, fdref, fdout;
-    struct timeval before, after;
-#endif
-};
-
-pa_processor *pa_processor_new(size_t nframes, pa_sample_spec *ss, pa_processor_algo_t backend, pa_process_flags_t flags);
-int pa_processor_process(pa_processor *processor, int8_t *rec, int8_t *ref, int8_t *out);
+pa_processor *pa_processor_new(pa_core *core,
+                                uint32_t process_msec,
+                                pa_sample_spec *output_ss,
+                                pa_channel_map *output_map,
+                                pa_sample_spec *source_ss,
+                                pa_processor_method_t method);
+int pa_processor_bind_reference(pa_processor *processor,
+                                uint32_t process_msec,
+                                pa_sample_spec *reference_ss,
+                                pa_channel_map *reference_chmap);
+int pa_processor_setup_reference_memblockq_padding(pa_processor *processor, pa_usec_t latency);
+int pa_processor_process(pa_processor *processor, pa_memchunk *rec, pa_memchunk *out);
+int pa_processor_push_data(pa_processor *processor, pa_memchunk *chunk);
+void pa_processor_flush(pa_processor *processor);
 int pa_processor_free(pa_processor *processor);
-size_t pa_processor_get_blocksize(pa_processor *processor);
+pa_processor_method_t pa_processor_get_method(const char *requset_method, const char *default_method);
+const char *pa_processor_method_to_string(pa_processor_method_t method);
 
 #endif
+
index 0fc047c..5bd3a61 100644 (file)
@@ -279,19 +279,6 @@ static int source_process_msg(
 
             return 0;
         }
-        case PA_SOURCE_MESSAGE_REMOVE_OUTPUT: {
-            pa_source_output *o = (pa_source_output *)data;
-
-            if (!(o->flags & PA_SOURCE_OUTPUT_ECHO_CANCEL))
-                break;
-
-            if (u->ec_asyncmsgq) {
-                pa_asyncmsgq_send(u->ec_asyncmsgq, u->ec_object,
-                                    PA_ECHO_CANCEL_MESSAGE_SOURCE_OUTPUT_UNLINK, o, 0, NULL);
-            }
-
-            return 0;
-        }
         case PA_SOURCE_MESSAGE_REBUILD_RTPOLL: {
             struct arguments {
                 pa_msgobject *o;