2 This file is part of PulseAudio.
4 Copyright 2023 Jaechul Lee <jcsing.lee@samsung.com>
6 PulseAudio is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published
8 by the Free Software Foundation; either version 2.1 of the License,
9 or (at your option) any later version.
11 PulseAudio is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License
17 along with PulseAudio; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26 #include <pulsecore/macro.h>
27 #include <pulsecore/sink.h>
28 #include <pulsecore/source.h>
29 #include <pulsecore/module.h>
30 #include <pulsecore/core-util.h>
31 #include <pulsecore/modargs.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/thread-mq.h>
35 #include <pulsecore/rtpoll.h>
36 #include <pulsecore/poll.h>
37 #include <pulsecore/namereg.h>
38 #include <pulse/util.h>
39 #include <pulse/timeval.h>
41 #include "processor.h"
42 #include "processor_holder.h"
44 PA_MODULE_AUTHOR("Tizen");
45 PA_MODULE_DESCRIPTION("Tizen Audio Preprocessor");
46 PA_MODULE_VERSION(PACKAGE_VERSION);
47 PA_MODULE_LOAD_ONCE(true);
50 "use_system_reference=<name of method using for reference. [method]=audio-share, filesrc > ");
53 #define DEFAULT_PROCESS_USEC 10000
55 typedef struct preprocessor pa_preprocessor;
60 pa_hook_slot *source_output_fixate_slot;
61 pa_hook_slot *source_output_put_slot;
62 pa_hook_slot *source_output_unlink_slot;
63 pa_hook_slot *source_output_unlink_post_slot;
65 pa_hook_slot *source_output_move_start_slot;
66 pa_hook_slot *source_output_move_finish_slot;
70 pa_thread_mq thread_mq;
71 pa_asyncmsgq *asyncmsgq_sink;
72 pa_asyncmsgq *asyncmsgq_source;
74 pa_preprocessor *preprocessor;
75 bool enable_in_thread;
76 bool reset_lazy_reference;
84 PA_DEFINE_PRIVATE_CLASS(pa_preprocessor, pa_msgobject);
85 #define PA_PREPROCESSOR(o) (pa_preprocessor_cast(o))
87 #define MEMBLOCKQ_MAXLENGTH (16 * 1024 * 1024)
89 static const char* const valid_modargs[] = {
93 static bool proplist_test_tizen_version2(pa_proplist *p) {
94 const char *tizen_version;
99 if (!(tizen_version = pa_proplist_gets(p, "tizen.version")))
102 if (pa_atou(tizen_version, &version))
111 static void process_msg_main_thread(pa_msgobject *o, int code, void *userdata) {
115 o->process_msg(o, code, userdata, 0UL, NULL);
118 static int proplist_get_fragment_size_usec(pa_proplist *p, pa_sample_spec *sample_spec, pa_usec_t *usec) {
119 const char *prop_fragsize;
123 pa_assert(sample_spec);
126 if (!(prop_fragsize = pa_proplist_gets(p, PA_PROP_DEVICE_BUFFERING_FRAGMENT_SIZE)))
129 if (pa_atou(prop_fragsize, &fragsize))
132 *usec = pa_bytes_to_usec(fragsize, sample_spec);
137 static bool is_preprocessor_source_output(pa_proplist *p) {
140 if (pa_proplist_gets(p, PA_PROP_MEDIA_PREPROCESSOR_METHOD))
146 static bool is_preprocessor_marked(pa_source_output_flags_t flags) {
147 if (flags & PA_SOURCE_OUTPUT_PREPROCESSOR)
153 static bool lookup_preprocessor_exist(pa_core *c) {
159 PA_IDXSET_FOREACH(o, c->source_outputs, idx) {
160 pa_source_output_assert_ref(o);
162 if (o->flags & PA_SOURCE_OUTPUT_PREPROCESSOR)
169 static pa_sink *convert_reference_str_to_sink(pa_core *c, const char *str) {
176 if (pa_atoi(str, &idx) < 0)
179 s = pa_idxset_get_by_index(c->sinks, idx);
183 pa_log_info("Found reference sink(%d, %s)", s->index, s->name);
188 static int send_message_rebuild_rtpoll(pa_msgobject *dst, pa_msgobject *src, pa_asyncmsgq *q, pa_processor_holder *holder) {
192 pa_processor_holder *holder;
195 pa_asyncmsgq *asyncmsgq;
199 pa_assert(pa_sink_isinstance(dst));
203 args.holder = holder;
205 asyncmsgq = PA_SINK(dst)->asyncmsgq;
206 code = PA_SINK_MESSAGE_PREPROCESSOR_REBUILD_RTPOLL;
208 pa_asyncmsgq_send(asyncmsgq, dst, code, src ? (void *)&args : NULL, 0, NULL);
213 static void connect_to_reference_sink(pa_processor_holder *holder, pa_msgobject *o, pa_asyncmsgq *q, bool enable) {
215 pa_processor_reference *reference;
219 reference = pa_processor_holder_get_connected_processor_reference(holder);
223 sink = pa_processor_reference_get_sink(reference);
228 send_message_rebuild_rtpoll(PA_MSGOBJECT(sink), o, q, holder);
230 send_message_rebuild_rtpoll(PA_MSGOBJECT(sink), NULL, NULL, NULL);
232 pa_log_info("connected to reference sink. enable(%d), sink(%s)", enable, sink->name);
235 static void terminate_preprocessor_holder(struct userdata *u) {
236 pa_processor_holder *holder;
243 PA_IDXSET_FOREACH(o, u->core->source_outputs, idx) {
244 pa_source_output_assert_ref(o);
246 if (is_preprocessor_marked(o->flags)) {
247 holder = (pa_processor_holder *)o->thread_info.processor_holder;
248 connect_to_reference_sink(holder, NULL, NULL, false);
250 source = pa_processor_holder_get_current_source(holder);
252 pa_asyncmsgq_send(source->asyncmsgq, PA_MSGOBJECT(source), PA_SOURCE_MESSAGE_PREPROCESSOR_TERMINATE, NULL, 0, NULL);
254 pa_processor_holder_free(holder);
255 o->thread_info.processor_holder = NULL;
256 o->preprocess = NULL;
258 o->flags &= ~PA_SOURCE_OUTPUT_PREPROCESSOR;
263 static pa_usec_t get_round_trip_latency(pa_source *source, pa_sink *sink) {
264 pa_usec_t sink_latency = 0ULL;
265 pa_usec_t source_latency = 0ULL;
270 pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(sink), PA_SINK_MESSAGE_GET_LATENCY, &sink_latency, 0, NULL);
271 pa_asyncmsgq_send(source->asyncmsgq, PA_MSGOBJECT(source), PA_SOURCE_MESSAGE_GET_LATENCY, &source_latency, 0, NULL);
273 pa_log_info("sink latency (%" PRIu64 "), source latency(%" PRIu64 ")", sink_latency, source_latency);
275 return sink_latency + source_latency;
278 static pa_processor_holder *build_processor_holder(pa_core *core, pa_source_output_new_data *data) {
279 const char *state = NULL;
280 const char *processors_list;
281 char *processor_str = NULL;
283 pa_processor_holder *holder;
284 pa_usec_t process_usec = 10 * PA_USEC_PER_MSEC;
289 holder = pa_processor_holder_new(core, &data->sample_spec);
291 pa_log_error("Failed to allocate pa_processor_holder");
295 processors_list = pa_proplist_gets(data->proplist, PA_PROP_MEDIA_PREPROCESSOR_METHOD);
296 pa_assert(processors_list);
298 if (proplist_get_fragment_size_usec(data->source->proplist, &data->source->sample_spec, &process_usec) < 0)
299 pa_log_warn("Failed to get source fragment usec. use default process usec");
301 pa_log_info("request to build processors_list(%s)", processors_list);
303 while ((processor_str = pa_split(processors_list, ",", &state))) {
304 pa_processor *processor;
305 pa_processor_method_t method;
307 if (pa_processor_method_enum(processor_str, &method) < 0) {
308 pa_log_error("Failed to get method. processor_str(%s)", processor_str);
312 processor = pa_processor_new(core, process_usec / PA_USEC_PER_MSEC,
313 &data->sample_spec, method);
315 pa_log_error("Failed to create pa_processor. preprocessor(aec) will be disabled");
320 if (pa_processor_method_need_reference_structure(method)) {
321 pa_sample_spec request_ss;
322 pa_processor_reference *reference;
323 const char *ref_sink;
326 ref_sink = pa_proplist_gets(data->proplist, PA_PROP_MEDIA_ECHO_CANCEL_REFERENCE_SINK);
328 pa_log_error("Failed to get ref sink");
332 sink = convert_reference_str_to_sink(core, ref_sink);
334 pa_log_error("Failed to convert sink");
338 request_ss = data->sample_spec;
339 if (method == PROCESSOR_METHOD_REFERENCE_COPY) {
340 request_ss.channels = 1;
341 data->flags |= PA_SOURCE_OUTPUT_NO_REMAP;
344 reference = pa_processor_reference_new_custom(core, sink, &sink->sample_spec,
345 &request_ss, process_usec,
346 PROCESSOR_REFERENCE_METHOD_NONE);
348 pa_processor_free(processor);
349 pa_log_error("Failed to create reference custom. processor_str(%s)", processor_str);
353 /* holder -> reference -> processor */
354 pa_processor_attach_reference(processor, reference);
355 if (pa_processor_holder_connect_reference(holder, reference) < 0) {
356 pa_processor_reference_free(reference);
357 pa_processor_free(processor);
358 pa_log_error("Failed to connect holder to reference");
363 pa_processor_holder_register_processor_sequencial(holder, processor);
365 pa_log_info("processor was created. processor(%s), process_msec(%lu)",
366 processor_str, process_usec / PA_USEC_PER_MSEC);
368 pa_xfree(processor_str);
375 pa_processor_holder_free(holder);
377 pa_xfree(processor_str);
382 static void destroy_source_output_preprocessor(pa_source_output *o) {
383 pa_processor_holder *holder;
387 holder = (pa_processor_holder *)o->thread_info.processor_holder;
389 pa_processor_holder_free(holder);
390 o->thread_info.processor_holder = NULL;
393 o->preprocess = NULL;
394 o->thread_info.processor_holder = NULL;
397 static int preprocess(pa_source_output *o, pa_memchunk *chunk, pa_memchunk *ochunk) {
398 pa_processor_holder *holder;
404 pa_assert(o->thread_info.processor_holder);
406 holder = (pa_processor_holder *)o->thread_info.processor_holder;
408 /* chunk must contain resampled sound pcm */
409 pa_processor_holder_push_data(holder, chunk);
411 ret = pa_processor_holder_pump(holder);
412 if (ret != PROCESSOR_OK) {
413 if (ret != -PROCESSOR_ERR_BUFFERING)
414 pa_log_warn("Failed to pump holder. ret(%x)", ret);
419 pa_processor_holder_pull_data(holder, ochunk);
424 /* rendering thread is separated because ec/ns takes much time in I/O thread */
425 static int process_msg(
430 pa_memchunk *chunk) {
432 struct userdata *u = PA_PREPROCESSOR(o)->u;
435 case PA_SOURCE_MESSAGE_PREPROCESSOR_ADD_OUTPUT: {
436 pa_source_output *output;
437 pa_processor_holder *holder;
439 output = PA_SOURCE_OUTPUT(data);
442 holder = (pa_processor_holder *)output->thread_info.processor_holder;
445 pa_processor_holder_set_current_source(holder, output->source);
446 output->preprocess = preprocess;
447 u->enable_in_thread = true;
448 u->reset_lazy_reference = true;
450 pa_processor_holder_dump(holder);
452 pa_log_info("source-output with a preprocessor will be added");
456 case PA_SOURCE_MESSAGE_PREPROCESSOR_REMOVE_OUTPUT: {
457 pa_source_output *output;
458 pa_processor_holder *holder;
460 output = PA_SOURCE_OUTPUT(data);
463 holder = (pa_processor_holder *)output->thread_info.processor_holder;
466 pa_processor_holder_set_current_source(holder, NULL);
467 output->preprocess = NULL;
468 u->enable_in_thread = false;
470 pa_processor_holder_dump(holder);
472 pa_log_info("source-output with a preprocessor will be removed");
476 case PA_SOURCE_MESSAGE_PREPROCESSOR_PUSH_DATA:
477 if (!u->enable_in_thread)
480 pa_source *source = PA_SOURCE(data);
481 pa_source_post(source, chunk);
484 case PA_SOURCE_MESSAGE_PREPROCESSOR_PUSH_REFERENCE: {
485 pa_processor_holder *holder = (pa_processor_holder *)data;
488 pa_processor_reference *reference;
490 if (!u->enable_in_thread)
493 reference = pa_processor_holder_get_connected_processor_reference(holder);
494 pa_assert(reference);
496 sink = pa_processor_reference_get_sink(reference);
497 source = pa_processor_holder_get_current_source(holder);
501 /* first reference memchunk after connecting source-output */
502 if (u->reset_lazy_reference) {
503 pa_processor_reference_reset(reference);
504 pa_processor_reference_add_latency_padding(reference, get_round_trip_latency(source, sink));
506 u->reset_lazy_reference = false;
509 if (pa_processor_holder_push_reference_data(holder, chunk) < 0)
510 pa_log_error("Failed to push reference data");
514 case PA_SOURCE_MESSAGE_PREPROCESSOR_RESET_REFERENCE:
515 if (!u->enable_in_thread)
519 u->reset_lazy_reference = true;
522 case PA_SOURCE_MESSAGE_PREPROCESSOR_DESTROY: {
523 pa_source_output *output;
525 output = PA_SOURCE_OUTPUT(data);
528 destroy_source_output_preprocessor(output);
530 u->enable_in_thread = false;
534 case PA_SOURCE_MESSAGE_PREPROCESSOR_TERMINATE:
535 terminate_preprocessor_holder(u);
536 u->enable_in_thread = false;
546 static pa_hook_result_t source_output_fixate_cb(pa_core *c, pa_source_output_new_data *data, void *userdata) {
547 struct userdata *u = (struct userdata *)userdata;
548 pa_processor_holder *holder;
554 if (!is_preprocessor_source_output(data->proplist))
557 if (lookup_preprocessor_exist(c)) {
558 pa_log_warn("Don't allow two instances");
562 holder = build_processor_holder(c, data);
564 pa_log_error("Failed to build holder");
565 return PA_HOOK_CANCEL;
568 data->processor_holder = (void *)holder;
569 if (!data->processor_holder) {
570 pa_log_error("Failed to create processor_holder");
574 data->flags |= PA_SOURCE_OUTPUT_PREPROCESSOR;
576 pa_processor_holder_set_private_data(holder, u->preprocessor, u->asyncmsgq_source);
581 data->flags &= ~ PA_SOURCE_OUTPUT_PREPROCESSOR;
582 pa_processor_holder_free(holder);
587 static pa_hook_result_t source_output_put_cb(pa_core *c, pa_source_output *o, void *userdata) {
588 struct userdata *u = (struct userdata *)userdata;
589 pa_processor_holder *holder;
595 if (!is_preprocessor_marked(o->flags))
598 holder = (pa_processor_holder *)o->thread_info.processor_holder;
600 pa_log_error("Failed to get processor holder");
604 if (!proplist_test_tizen_version2(o->source->proplist)) {
605 pa_msgobject *msgobject = PA_MSGOBJECT(u->preprocessor);
606 pa_assert(msgobject);
608 process_msg_main_thread(msgobject, PA_SOURCE_MESSAGE_PREPROCESSOR_ADD_OUTPUT, o);
610 connect_to_reference_sink(holder, PA_MSGOBJECT(u->preprocessor), u->asyncmsgq_sink, true);
615 connect_to_reference_sink(holder, PA_MSGOBJECT(u->preprocessor), u->asyncmsgq_sink, true);
620 static pa_hook_result_t source_output_unlink_cb(pa_core *c, pa_source_output *o, void *userdata) {
621 struct userdata *u = (struct userdata *)userdata;
622 pa_processor_holder *holder;
628 if (!is_preprocessor_marked(o->flags))
631 holder = (pa_processor_holder *)o->thread_info.processor_holder;
633 pa_log_error("Failed to get processor holder");
637 /* in case of normal source(not tizen2)
638 * This should be destroy in unlink process
639 * because o->source wouldn't exist in the unlink_post step. */
640 if (!proplist_test_tizen_version2(o->source->proplist)) {
641 pa_msgobject *msgobject = PA_MSGOBJECT(u->preprocessor);
642 pa_assert(msgobject);
644 process_msg_main_thread(msgobject, PA_SOURCE_MESSAGE_PREPROCESSOR_DESTROY, o);
646 connect_to_reference_sink(holder, NULL, NULL, false);
652 /* This function will be also called when source unlink */
653 static pa_hook_result_t source_output_unlink_post_cb(pa_core *c, pa_source_output *o, void *userdata) {
654 struct userdata *u = (struct userdata *)userdata;
655 pa_processor_holder *holder;
661 if (!is_preprocessor_marked(o->flags))
664 holder = (pa_processor_holder *)o->thread_info.processor_holder;
666 pa_log_info("holder doesn't exist. source-output might not be connected to tizen2 source");
670 connect_to_reference_sink(holder, NULL, NULL, false);
672 pa_asyncmsgq_send(u->thread_mq.inq, PA_MSGOBJECT(u->preprocessor),
673 PA_SOURCE_MESSAGE_PREPROCESSOR_DESTROY, (void *)o, 0, NULL);
678 static pa_hook_result_t source_output_move_start_cb(pa_core *c, pa_source_output *o, void *userdata) {
679 struct userdata *u = (struct userdata *)userdata;
685 if (!proplist_test_tizen_version2(o->source->proplist)) {
686 pa_msgobject *msgobject = PA_MSGOBJECT(u->preprocessor);
687 pa_assert(msgobject);
689 process_msg_main_thread(msgobject, PA_SOURCE_MESSAGE_PREPROCESSOR_REMOVE_OUTPUT, o);
695 static pa_hook_result_t source_output_move_finish_cb(pa_core *c, pa_source_output *o, void *userdata) {
696 struct userdata *u = (struct userdata *)userdata;
702 if (!proplist_test_tizen_version2(o->source->proplist)) {
703 pa_msgobject *msgobject = PA_MSGOBJECT(u->preprocessor);
704 pa_assert(msgobject);
706 process_msg_main_thread(msgobject, PA_SOURCE_MESSAGE_PREPROCESSOR_ADD_OUTPUT, o);
712 static void thread_func(void *userdata) {
713 struct userdata *u = (struct userdata *)userdata;
717 pa_log_debug("Thread starting up");
719 #if 0 // FIXME: RT disabled due to segfault when invoking tflite
720 if (u->core->realtime_scheduling)
721 pa_thread_make_realtime(u->core->realtime_priority);
724 pa_thread_mq_install(&u->thread_mq);
729 if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
734 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core),
735 PA_CORE_MESSAGE_UNLOAD_MODULE, u->m, 0, NULL, NULL);
736 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
738 pa_thread_mq_done(&u->thread_mq);
740 pa_log_debug("Thread shutting down");
743 int pa__init(pa_module *m) {
744 pa_modargs *ma = NULL;
745 struct userdata *u = NULL;
749 if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
750 pa_log_error("Failed to parse module arguments.");
754 m->userdata = u = pa_xnew0(struct userdata, 1);
758 u->preprocessor = pa_msgobject_new(pa_preprocessor);
759 u->preprocessor->parent.process_msg = process_msg;
760 u->preprocessor->u = u;
762 u->rtpoll = pa_rtpoll_new();
763 u->asyncmsgq_source = pa_asyncmsgq_new(0);
764 u->asyncmsgq_sink = pa_asyncmsgq_new(0);
766 pa_rtpoll_item_new_asyncmsgq_read(u->rtpoll, PA_RTPOLL_EARLY, u->asyncmsgq_sink);
767 pa_rtpoll_item_new_asyncmsgq_read(u->rtpoll, PA_RTPOLL_EARLY, u->asyncmsgq_source);
768 pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
770 if (!(u->thread = pa_thread_new("tizenaudio-preprocessor", thread_func, u))) {
771 pa_log_error("Failed to create thread.");
775 u->source_output_unlink_slot =
776 pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_UNLINK],
777 PA_HOOK_EARLY, (pa_hook_cb_t) source_output_unlink_cb, u);
779 u->source_output_unlink_post_slot =
780 pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_UNLINK_POST],
781 PA_HOOK_EARLY, (pa_hook_cb_t) source_output_unlink_post_cb, u);
783 /* source_output_fixate_cb must be called after new_cb callback in stream manager
784 * because stream-manager converts the device_id to the index of the sink */
785 u->source_output_fixate_slot =
786 pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_FIXATE],
787 PA_HOOK_LATE, (pa_hook_cb_t) source_output_fixate_cb, u);
789 u->source_output_put_slot =
790 pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_PUT],
791 PA_HOOK_LATE, (pa_hook_cb_t) source_output_put_cb, u);
793 u->source_output_move_start_slot =
794 pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_MOVE_START],
795 PA_HOOK_LATE, (pa_hook_cb_t) source_output_move_start_cb, u);
797 u->source_output_move_finish_slot =
798 pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_MOVE_FINISH],
799 PA_HOOK_LATE, (pa_hook_cb_t) source_output_move_finish_cb, u);
801 /* TODO : need to check sink configuration change */
815 void pa__done(pa_module *m) {
820 if (!(u = m->userdata))
823 pa_asyncmsgq_send(u->thread_mq.inq, PA_MSGOBJECT(u->preprocessor),
824 PA_SOURCE_MESSAGE_PREPROCESSOR_TERMINATE, NULL, 0, NULL);
826 if (u->source_output_unlink_post_slot)
827 pa_hook_slot_free(u->source_output_unlink_post_slot);
829 if (u->source_output_fixate_slot)
830 pa_hook_slot_free(u->source_output_fixate_slot);
832 if (u->source_output_put_slot)
833 pa_hook_slot_free(u->source_output_put_slot);
835 if (u->asyncmsgq_sink)
836 pa_asyncmsgq_unref(u->asyncmsgq_sink);
838 if (u->asyncmsgq_source)
839 pa_asyncmsgq_unref(u->asyncmsgq_source);
842 pa_rtpoll_free(u->rtpoll);
844 pa_thread_mq_done(&u->thread_mq);