preprocess: Support source and source2 both
[platform/core/multimedia/pulseaudio-modules-tizen.git] / src / preprocessor / module-tizenaudio-preprocessor.c
1 /***
2   This file is part of PulseAudio.
3
4   Copyright 2023 Jaechul Lee <jcsing.lee@samsung.com>
5
6   PulseAudio is free software; you can redistribute it and/or modify
7   it under the terms of the GNU Lesser General Public License as published
8   by the Free Software Foundation; either version 2.1 of the License,
9   or (at your option) any later version.
10
11   PulseAudio is distributed in the hope that it will be useful, but
12   WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14   General Public License for more details.
15
16   You should have received a copy of the GNU Lesser General Public License
17   along with PulseAudio; if not, write to the Free Software
18   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19   USA.
20 ***/
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include <pulsecore/macro.h>
27 #include <pulsecore/sink.h>
28 #include <pulsecore/source.h>
29 #include <pulsecore/module.h>
30 #include <pulsecore/core-util.h>
31 #include <pulsecore/modargs.h>
32 #include <pulsecore/log.h>
33 #include <pulsecore/thread.h>
34 #include <pulsecore/thread-mq.h>
35 #include <pulsecore/rtpoll.h>
36 #include <pulsecore/poll.h>
37 #include <pulsecore/namereg.h>
38 #include <pulse/util.h>
39 #include <pulse/timeval.h>
40
41 #include "processor.h"
42 #include "processor_holder.h"
43
44 PA_MODULE_AUTHOR("Tizen");
45 PA_MODULE_DESCRIPTION("Tizen Audio Preprocessor");
46 PA_MODULE_VERSION(PACKAGE_VERSION);
47 PA_MODULE_LOAD_ONCE(true);
48 /*
49 PA_MODULE_USAGE(
50         "use_system_reference=<name of method using for reference. [method]=audio-share, filesrc > ");
51         */
52
53 #define DEFAULT_PROCESS_USEC 10000
54
55 typedef struct preprocessor pa_preprocessor;
56 struct userdata {
57     pa_core *core;
58     pa_module *m;
59
60     pa_hook_slot *source_output_fixate_slot;
61     pa_hook_slot *source_output_put_slot;
62     pa_hook_slot *source_output_unlink_slot;
63     pa_hook_slot *source_output_unlink_post_slot;
64
65     pa_hook_slot *source_output_move_start_slot;
66     pa_hook_slot *source_output_move_finish_slot;
67
68     pa_rtpoll *rtpoll;
69     pa_thread *thread;
70     pa_thread_mq thread_mq;
71     pa_asyncmsgq *asyncmsgq_sink;
72     pa_asyncmsgq *asyncmsgq_source;
73
74     pa_preprocessor *preprocessor;
75     bool enable_in_thread;
76     bool reset_lazy_reference;
77 };
78
79 struct preprocessor {
80     pa_msgobject parent;
81     struct userdata *u;
82 };
83
84 PA_DEFINE_PRIVATE_CLASS(pa_preprocessor, pa_msgobject);
85 #define PA_PREPROCESSOR(o) (pa_preprocessor_cast(o))
86
87 #define MEMBLOCKQ_MAXLENGTH (16 * 1024 * 1024)
88
89 static const char* const valid_modargs[] = {
90     NULL,
91 };
92
93 static bool proplist_test_tizen_version2(pa_proplist *p) {
94     const char *tizen_version;
95     uint32_t version;
96
97     pa_assert(p);
98
99     if (!(tizen_version = pa_proplist_gets(p, "tizen.version")))
100         return false;
101
102     if (pa_atou(tizen_version, &version))
103         return false;
104
105     if (version < 2)
106         return false;
107
108     return true;
109 }
110
111 static void process_msg_main_thread(pa_msgobject *o, int code, void *userdata) {
112     pa_assert(o);
113     pa_assert(userdata);
114
115     o->process_msg(o, code, userdata, 0UL, NULL);
116 }
117
118 static int proplist_get_fragment_size_usec(pa_proplist *p, pa_sample_spec *sample_spec, pa_usec_t *usec) {
119     const char *prop_fragsize;
120     uint32_t fragsize;
121
122     pa_assert(p);
123     pa_assert(sample_spec);
124     pa_assert(usec);
125
126     if (!(prop_fragsize = pa_proplist_gets(p, PA_PROP_DEVICE_BUFFERING_FRAGMENT_SIZE)))
127         return -1;
128
129     if (pa_atou(prop_fragsize, &fragsize))
130         return -1;
131
132     *usec = pa_bytes_to_usec(fragsize, sample_spec);
133
134     return 0;
135 }
136
137 static bool is_preprocessor_source_output(pa_proplist *p) {
138     pa_assert(p);
139
140     if (pa_proplist_gets(p, PA_PROP_MEDIA_PREPROCESSOR_METHOD))
141         return true;
142
143     return false;
144 }
145
146 static bool is_preprocessor_marked(pa_source_output_flags_t flags) {
147     if (flags & PA_SOURCE_OUTPUT_PREPROCESSOR)
148         return true;
149
150     return false;
151 }
152
153 static bool lookup_preprocessor_exist(pa_core *c) {
154     pa_source_output *o;
155     uint32_t idx;
156
157     pa_assert(c);
158
159     PA_IDXSET_FOREACH(o, c->source_outputs, idx) {
160         pa_source_output_assert_ref(o);
161
162         if (o->flags & PA_SOURCE_OUTPUT_PREPROCESSOR)
163             return true;
164     }
165
166     return false;
167 }
168
169 static pa_sink *convert_reference_str_to_sink(pa_core *c, const char *str) {
170     int32_t idx;
171     pa_sink *s;
172
173     pa_assert(c);
174     pa_assert(str);
175
176     if (pa_atoi(str, &idx) < 0)
177         return NULL;
178
179     s = pa_idxset_get_by_index(c->sinks, idx);
180     if (!s)
181         return NULL;
182
183     pa_log_info("Found reference sink(%d, %s)", s->index, s->name);
184
185     return s;
186 }
187
188 static int send_message_rebuild_rtpoll(pa_msgobject *dst, pa_msgobject *src, pa_asyncmsgq *q, pa_processor_holder *holder) {
189     struct arguments {
190         pa_msgobject *o;
191         pa_asyncmsgq *q;
192         pa_processor_holder *holder;
193     } args;
194
195     pa_asyncmsgq *asyncmsgq;
196     int code;
197
198     pa_assert(dst);
199     pa_assert(pa_sink_isinstance(dst));
200
201     args.o = src;
202     args.q = q;
203     args.holder = holder;
204
205     asyncmsgq = PA_SINK(dst)->asyncmsgq;
206     code = PA_SINK_MESSAGE_PREPROCESSOR_REBUILD_RTPOLL;
207
208     pa_asyncmsgq_send(asyncmsgq, dst, code, src ? (void *)&args : NULL, 0, NULL);
209
210     return 0;
211 }
212
213 static void connect_to_reference_sink(pa_processor_holder *holder, pa_msgobject *o, pa_asyncmsgq *q, bool enable) {
214     pa_sink *sink;
215     pa_processor_reference *reference;
216
217     pa_assert(holder);
218
219     reference = pa_processor_holder_get_connected_processor_reference(holder);
220     if (!reference)
221         return;
222
223     sink = pa_processor_reference_get_sink(reference);
224     if (!sink)
225         return;
226
227     if (enable)
228         send_message_rebuild_rtpoll(PA_MSGOBJECT(sink), o, q, holder);
229     else
230         send_message_rebuild_rtpoll(PA_MSGOBJECT(sink), NULL, NULL, NULL);
231
232     pa_log_info("connected to reference sink. enable(%d), sink(%s)", enable, sink->name);
233 }
234
235 static void terminate_preprocessor_holder(struct userdata *u) {
236     pa_processor_holder *holder;
237     pa_source_output *o;
238     pa_source *source;
239     uint32_t idx;
240
241     pa_assert(u);
242
243     PA_IDXSET_FOREACH(o, u->core->source_outputs, idx) {
244         pa_source_output_assert_ref(o);
245
246         if (is_preprocessor_marked(o->flags)) {
247             holder = (pa_processor_holder *)o->thread_info.processor_holder;
248             connect_to_reference_sink(holder, NULL, NULL, false);
249
250             source = pa_processor_holder_get_current_source(holder);
251             pa_assert(source);
252             pa_asyncmsgq_send(source->asyncmsgq, PA_MSGOBJECT(source), PA_SOURCE_MESSAGE_PREPROCESSOR_TERMINATE, NULL, 0, NULL);
253
254             pa_processor_holder_free(holder);
255             o->thread_info.processor_holder = NULL;
256             o->preprocess = NULL;
257
258             o->flags &= ~PA_SOURCE_OUTPUT_PREPROCESSOR;
259         }
260     }
261 }
262
263 static pa_usec_t get_round_trip_latency(pa_source *source, pa_sink *sink) {
264     pa_usec_t sink_latency = 0ULL;
265     pa_usec_t source_latency = 0ULL;
266
267     pa_assert(sink);
268     pa_assert(source);
269
270     pa_asyncmsgq_send(sink->asyncmsgq, PA_MSGOBJECT(sink), PA_SINK_MESSAGE_GET_LATENCY, &sink_latency, 0, NULL);
271     pa_asyncmsgq_send(source->asyncmsgq, PA_MSGOBJECT(source), PA_SOURCE_MESSAGE_GET_LATENCY, &source_latency, 0, NULL);
272
273     pa_log_info("sink latency (%" PRIu64 "), source latency(%" PRIu64 ")", sink_latency, source_latency);
274
275     return sink_latency + source_latency;
276 }
277
278 static pa_processor_holder *build_processor_holder(pa_core *core, pa_source_output_new_data *data) {
279     const char *state = NULL;
280     const char *processors_list;
281     char *processor_str = NULL;
282
283     pa_processor_holder *holder;
284     pa_usec_t process_usec = 10 * PA_USEC_PER_MSEC;
285
286     pa_assert(core);
287     pa_assert(data);
288
289     holder = pa_processor_holder_new(core, &data->sample_spec);
290     if (!holder) {
291         pa_log_error("Failed to allocate pa_processor_holder");
292         return NULL;
293     }
294
295     processors_list = pa_proplist_gets(data->proplist, PA_PROP_MEDIA_PREPROCESSOR_METHOD);
296     pa_assert(processors_list);
297
298     if (proplist_get_fragment_size_usec(data->source->proplist, &data->source->sample_spec, &process_usec) < 0)
299         pa_log_warn("Failed to get source fragment usec. use default process usec");
300
301     pa_log_info("request to build processors_list(%s)", processors_list);
302
303     while ((processor_str = pa_split(processors_list, ",", &state))) {
304         pa_processor *processor;
305         pa_processor_method_t method;
306
307         if (pa_processor_method_enum(processor_str, &method) < 0) {
308             pa_log_error("Failed to get method. processor_str(%s)", processor_str);
309             goto fail;
310         }
311
312         processor = pa_processor_new(core, process_usec / PA_USEC_PER_MSEC,
313                                             &data->sample_spec, method);
314         if (!processor) {
315             pa_log_error("Failed to create pa_processor. preprocessor(aec) will be disabled");
316             goto fail;
317         }
318
319         /* reference */
320         if (pa_processor_method_need_reference_structure(method)) {
321             pa_sample_spec request_ss;
322             pa_processor_reference *reference;
323             const char *ref_sink;
324             pa_sink *sink;
325
326             ref_sink = pa_proplist_gets(data->proplist, PA_PROP_MEDIA_ECHO_CANCEL_REFERENCE_SINK);
327             if (!ref_sink) {
328                 pa_log_error("Failed to get ref sink");
329                 goto fail;
330             }
331
332             sink = convert_reference_str_to_sink(core, ref_sink);
333             if (!sink) {
334                 pa_log_error("Failed to convert sink");
335                 goto fail;
336             }
337
338             request_ss = data->sample_spec;
339             if (method == PROCESSOR_METHOD_REFERENCE_COPY) {
340                 request_ss.channels = 1;
341                 data->flags |= PA_SOURCE_OUTPUT_NO_REMAP;
342             }
343
344             reference = pa_processor_reference_new_custom(core, sink, &sink->sample_spec,
345                                                             &request_ss, process_usec,
346                                                             PROCESSOR_REFERENCE_METHOD_NONE);
347             if (!reference) {
348                 pa_processor_free(processor);
349                 pa_log_error("Failed to create reference custom. processor_str(%s)", processor_str);
350                 goto fail;
351             }
352
353             /* holder -> reference -> processor */
354             pa_processor_attach_reference(processor, reference);
355             if (pa_processor_holder_connect_reference(holder, reference) < 0) {
356                 pa_processor_reference_free(reference);
357                 pa_processor_free(processor);
358                 pa_log_error("Failed to connect holder to reference");
359                 goto fail;
360             }
361         }
362
363         pa_processor_holder_register_processor_sequencial(holder, processor);
364
365         pa_log_info("processor was created. processor(%s), process_msec(%lu)",
366                                 processor_str, process_usec / PA_USEC_PER_MSEC);
367
368         pa_xfree(processor_str);
369     }
370
371     return holder;
372
373 fail:
374     if (holder)
375         pa_processor_holder_free(holder);
376     if (processor_str)
377         pa_xfree(processor_str);
378
379     return NULL;
380 }
381
382 static void destroy_source_output_preprocessor(pa_source_output *o) {
383     pa_processor_holder *holder;
384
385     pa_assert(o);
386
387     holder = (pa_processor_holder *)o->thread_info.processor_holder;
388     if (holder) {
389         pa_processor_holder_free(holder);
390         o->thread_info.processor_holder = NULL;
391     }
392
393     o->preprocess = NULL;
394     o->thread_info.processor_holder = NULL;
395 }
396
397 static int preprocess(pa_source_output *o, pa_memchunk *chunk, pa_memchunk *ochunk) {
398     pa_processor_holder *holder;
399     int ret;
400
401     pa_assert(o);
402     pa_assert(chunk);
403     pa_assert(ochunk);
404     pa_assert(o->thread_info.processor_holder);
405
406     holder = (pa_processor_holder *)o->thread_info.processor_holder;
407
408     /* chunk must contain resampled sound pcm */
409     pa_processor_holder_push_data(holder, chunk);
410
411     ret = pa_processor_holder_pump(holder);
412     if (ret != PROCESSOR_OK) {
413         if (ret != -PROCESSOR_ERR_BUFFERING)
414             pa_log_warn("Failed to pump holder. ret(%x)", ret);
415
416         return ret;
417     }
418
419     pa_processor_holder_pull_data(holder, ochunk);
420
421     return 0;
422 }
423
424 /* rendering thread is separated because ec/ns takes much time in I/O thread */
425 static int process_msg(
426         pa_msgobject *o,
427         int code,
428         void *data,
429         int64_t offset,
430         pa_memchunk *chunk) {
431
432     struct userdata *u = PA_PREPROCESSOR(o)->u;
433
434     switch (code) {
435         case PA_SOURCE_MESSAGE_PREPROCESSOR_ADD_OUTPUT: {
436             pa_source_output *output;
437             pa_processor_holder *holder;
438
439             output = PA_SOURCE_OUTPUT(data);
440             pa_assert(output);
441
442             holder = (pa_processor_holder *)output->thread_info.processor_holder;
443             pa_assert(holder);
444
445             pa_processor_holder_set_current_source(holder, output->source);
446             output->preprocess = preprocess;
447             u->enable_in_thread = true;
448             u->reset_lazy_reference = true;
449
450             pa_processor_holder_dump(holder);
451
452             pa_log_info("source-output with a preprocessor will be added");
453
454             break;
455         }
456         case PA_SOURCE_MESSAGE_PREPROCESSOR_REMOVE_OUTPUT: {
457             pa_source_output *output;
458             pa_processor_holder *holder;
459
460             output = PA_SOURCE_OUTPUT(data);
461             pa_assert(output);
462
463             holder = (pa_processor_holder *)output->thread_info.processor_holder;
464             pa_assert(holder);
465
466             pa_processor_holder_set_current_source(holder, NULL);
467             output->preprocess = NULL;
468             u->enable_in_thread = false;
469
470             pa_processor_holder_dump(holder);
471
472             pa_log_info("source-output with a preprocessor will be removed");
473
474             break;
475         }
476         case PA_SOURCE_MESSAGE_PREPROCESSOR_PUSH_DATA:
477             if (!u->enable_in_thread)
478                 break;
479
480             pa_source *source = PA_SOURCE(data);
481             pa_source_post(source, chunk);
482
483             break;
484         case PA_SOURCE_MESSAGE_PREPROCESSOR_PUSH_REFERENCE: {
485             pa_processor_holder *holder = (pa_processor_holder *)data;
486             pa_sink *sink;
487             pa_source *source;
488             pa_processor_reference *reference;
489
490             if (!u->enable_in_thread)
491                 break;
492
493             reference = pa_processor_holder_get_connected_processor_reference(holder);
494             pa_assert(reference);
495
496             sink = pa_processor_reference_get_sink(reference);
497             source = pa_processor_holder_get_current_source(holder);
498             pa_assert(sink);
499             pa_assert(source);
500
501             /* first reference memchunk after connecting source-output */
502             if (u->reset_lazy_reference) {
503                 pa_processor_reference_reset(reference);
504                 pa_processor_reference_add_latency_padding(reference, get_round_trip_latency(source, sink));
505
506                 u->reset_lazy_reference = false;
507             }
508
509             if (pa_processor_holder_push_reference_data(holder, chunk) < 0)
510                 pa_log_error("Failed to push reference data");
511
512             break;
513         }
514         case PA_SOURCE_MESSAGE_PREPROCESSOR_RESET_REFERENCE:
515             if (!u->enable_in_thread)
516                 break;
517
518             if (!!data)
519                 u->reset_lazy_reference = true;
520
521             break;
522         case PA_SOURCE_MESSAGE_PREPROCESSOR_DESTROY: {
523             pa_source_output *output;
524
525             output = PA_SOURCE_OUTPUT(data);
526             pa_assert(output);
527
528             destroy_source_output_preprocessor(output);
529
530             u->enable_in_thread = false;
531
532             break;
533         }
534         case PA_SOURCE_MESSAGE_PREPROCESSOR_TERMINATE:
535             terminate_preprocessor_holder(u);
536             u->enable_in_thread = false;
537             break;
538         default:
539             pa_assert(0);
540             break;
541     }
542
543     return 0;
544 }
545
546 static pa_hook_result_t source_output_fixate_cb(pa_core *c, pa_source_output_new_data *data, void *userdata) {
547     struct userdata *u = (struct userdata *)userdata;
548     pa_processor_holder *holder;
549
550     pa_assert(c);
551     pa_assert(u);
552     pa_assert(data);
553
554     if (!is_preprocessor_source_output(data->proplist))
555         return PA_HOOK_OK;
556
557     if (lookup_preprocessor_exist(c)) {
558         pa_log_warn("Don't allow two instances");
559         return PA_HOOK_OK;
560     }
561
562     holder = build_processor_holder(c, data);
563     if (!holder) {
564         pa_log_error("Failed to build holder");
565         return PA_HOOK_CANCEL;
566     }
567
568     data->processor_holder = (void *)holder;
569     if (!data->processor_holder) {
570         pa_log_error("Failed to create processor_holder");
571         goto fail;
572     }
573
574     data->flags |= PA_SOURCE_OUTPUT_PREPROCESSOR;
575
576     pa_processor_holder_set_private_data(holder, u->preprocessor, u->asyncmsgq_source);
577
578     return PA_HOOK_OK;
579
580 fail:
581     data->flags &= ~ PA_SOURCE_OUTPUT_PREPROCESSOR;
582     pa_processor_holder_free(holder);
583
584     return PA_HOOK_OK;
585 }
586
587 static pa_hook_result_t source_output_put_cb(pa_core *c, pa_source_output *o, void *userdata) {
588     struct userdata *u = (struct userdata *)userdata;
589     pa_processor_holder *holder;
590
591     pa_assert(c);
592     pa_assert(o);
593     pa_assert(u);
594
595     if (!is_preprocessor_marked(o->flags))
596         return PA_HOOK_OK;
597
598     holder = (pa_processor_holder *)o->thread_info.processor_holder;
599     if (!holder) {
600         pa_log_error("Failed to get processor holder");
601         return PA_HOOK_OK;
602     }
603
604     if (!proplist_test_tizen_version2(o->source->proplist)) {
605         pa_msgobject *msgobject = PA_MSGOBJECT(u->preprocessor);
606         pa_assert(msgobject);
607
608         process_msg_main_thread(msgobject, PA_SOURCE_MESSAGE_PREPROCESSOR_ADD_OUTPUT, o);
609
610         connect_to_reference_sink(holder, PA_MSGOBJECT(u->preprocessor), u->asyncmsgq_sink, true);
611
612         return PA_HOOK_OK;
613     }
614
615     connect_to_reference_sink(holder, PA_MSGOBJECT(u->preprocessor), u->asyncmsgq_sink, true);
616
617     return PA_HOOK_OK;
618 }
619
620 static pa_hook_result_t source_output_unlink_cb(pa_core *c, pa_source_output *o, void *userdata) {
621     struct userdata *u = (struct userdata *)userdata;
622     pa_processor_holder *holder;
623
624     pa_assert(c);
625     pa_assert(o);
626     pa_assert(u);
627
628     if (!is_preprocessor_marked(o->flags))
629         return PA_HOOK_OK;
630
631     holder = (pa_processor_holder *)o->thread_info.processor_holder;
632     if (!holder) {
633         pa_log_error("Failed to get processor holder");
634         return PA_HOOK_OK;
635     }
636
637     /* in case of normal source(not tizen2)
638      * This should be destroy in unlink process
639      * because o->source wouldn't exist in the unlink_post step. */
640     if (!proplist_test_tizen_version2(o->source->proplist)) {
641         pa_msgobject *msgobject = PA_MSGOBJECT(u->preprocessor);
642         pa_assert(msgobject);
643
644         process_msg_main_thread(msgobject, PA_SOURCE_MESSAGE_PREPROCESSOR_DESTROY, o);
645
646         connect_to_reference_sink(holder, NULL, NULL, false);
647     }
648
649     return PA_HOOK_OK;
650 }
651
652 /* This function will be also called when source unlink */
653 static pa_hook_result_t source_output_unlink_post_cb(pa_core *c, pa_source_output *o, void *userdata) {
654     struct userdata *u = (struct userdata *)userdata;
655     pa_processor_holder *holder;
656
657     pa_assert(c);
658     pa_assert(o);
659     pa_assert(u);
660
661     if (!is_preprocessor_marked(o->flags))
662         return PA_HOOK_OK;
663
664     holder = (pa_processor_holder *)o->thread_info.processor_holder;
665     if (!holder) {
666         pa_log_info("holder doesn't exist. source-output might not be connected to tizen2 source");
667         return PA_HOOK_OK;
668     }
669
670     connect_to_reference_sink(holder, NULL, NULL, false);
671
672     pa_asyncmsgq_send(u->thread_mq.inq, PA_MSGOBJECT(u->preprocessor),
673                         PA_SOURCE_MESSAGE_PREPROCESSOR_DESTROY, (void *)o, 0, NULL);
674
675     return PA_HOOK_OK;
676 }
677
678 static pa_hook_result_t source_output_move_start_cb(pa_core *c, pa_source_output *o, void *userdata) {
679     struct userdata *u = (struct userdata *)userdata;
680
681     pa_assert(c);
682     pa_assert(o);
683     pa_assert(u);
684
685     if (!proplist_test_tizen_version2(o->source->proplist)) {
686         pa_msgobject *msgobject = PA_MSGOBJECT(u->preprocessor);
687         pa_assert(msgobject);
688
689         process_msg_main_thread(msgobject, PA_SOURCE_MESSAGE_PREPROCESSOR_REMOVE_OUTPUT, o);
690     }
691
692     return PA_HOOK_OK;
693 }
694
695 static pa_hook_result_t source_output_move_finish_cb(pa_core *c, pa_source_output *o, void *userdata) {
696     struct userdata *u = (struct userdata *)userdata;
697
698     pa_assert(c);
699     pa_assert(o);
700     pa_assert(u);
701
702     if (!proplist_test_tizen_version2(o->source->proplist)) {
703         pa_msgobject *msgobject = PA_MSGOBJECT(u->preprocessor);
704         pa_assert(msgobject);
705
706         process_msg_main_thread(msgobject, PA_SOURCE_MESSAGE_PREPROCESSOR_ADD_OUTPUT, o);
707     }
708
709     return PA_HOOK_OK;
710 }
711
712 static void thread_func(void *userdata) {
713     struct userdata *u = (struct userdata *)userdata;
714
715     pa_assert(u);
716
717     pa_log_debug("Thread starting up");
718
719 #if 0 // FIXME: RT disabled due to segfault when invoking tflite
720     if (u->core->realtime_scheduling)
721         pa_thread_make_realtime(u->core->realtime_priority);
722 #endif
723
724     pa_thread_mq_install(&u->thread_mq);
725
726     for (;;) {
727         int ret;
728
729         if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
730             goto fail;
731     }
732
733 fail:
734     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core),
735                             PA_CORE_MESSAGE_UNLOAD_MODULE, u->m, 0, NULL, NULL);
736     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
737
738     pa_thread_mq_done(&u->thread_mq);
739
740     pa_log_debug("Thread shutting down");
741 }
742
743 int pa__init(pa_module *m) {
744     pa_modargs *ma = NULL;
745     struct userdata *u = NULL;
746
747     pa_assert(m);
748
749     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
750         pa_log_error("Failed to parse module arguments.");
751         return -1;
752     }
753
754     m->userdata = u = pa_xnew0(struct userdata, 1);
755     u->core = m->core;
756     u->m = m;
757
758     u->preprocessor = pa_msgobject_new(pa_preprocessor);
759     u->preprocessor->parent.process_msg = process_msg;
760     u->preprocessor->u = u;
761
762     u->rtpoll = pa_rtpoll_new();
763     u->asyncmsgq_source = pa_asyncmsgq_new(0);
764     u->asyncmsgq_sink = pa_asyncmsgq_new(0);
765
766     pa_rtpoll_item_new_asyncmsgq_read(u->rtpoll, PA_RTPOLL_EARLY, u->asyncmsgq_sink);
767     pa_rtpoll_item_new_asyncmsgq_read(u->rtpoll, PA_RTPOLL_EARLY, u->asyncmsgq_source);
768     pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
769
770     if (!(u->thread = pa_thread_new("tizenaudio-preprocessor", thread_func, u))) {
771         pa_log_error("Failed to create thread.");
772         goto fail;
773     }
774
775     u->source_output_unlink_slot =
776         pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_UNLINK],
777                     PA_HOOK_EARLY, (pa_hook_cb_t) source_output_unlink_cb, u);
778
779     u->source_output_unlink_post_slot =
780         pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_UNLINK_POST],
781                     PA_HOOK_EARLY, (pa_hook_cb_t) source_output_unlink_post_cb, u);
782
783     /* source_output_fixate_cb must be called after new_cb callback in stream manager
784      * because stream-manager converts the device_id to the index of the sink */
785     u->source_output_fixate_slot =
786         pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_FIXATE],
787                     PA_HOOK_LATE, (pa_hook_cb_t) source_output_fixate_cb, u);
788
789     u->source_output_put_slot =
790         pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_PUT],
791                     PA_HOOK_LATE, (pa_hook_cb_t) source_output_put_cb, u);
792
793     u->source_output_move_start_slot =
794         pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_MOVE_START],
795                     PA_HOOK_LATE, (pa_hook_cb_t) source_output_move_start_cb, u);
796
797     u->source_output_move_finish_slot =
798         pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SOURCE_OUTPUT_MOVE_FINISH],
799                     PA_HOOK_LATE, (pa_hook_cb_t) source_output_move_finish_cb, u);
800
801     /* TODO : need to check sink configuration change */
802     pa_modargs_free(ma);
803
804     return 0;
805
806 fail:
807     if (ma)
808         pa_modargs_free(ma);
809
810     pa__done(m);
811
812     return -1;
813 }
814
815 void pa__done(pa_module *m) {
816     struct userdata *u;
817
818     pa_assert(m);
819
820     if (!(u = m->userdata))
821         return;
822
823     pa_asyncmsgq_send(u->thread_mq.inq, PA_MSGOBJECT(u->preprocessor),
824                         PA_SOURCE_MESSAGE_PREPROCESSOR_TERMINATE, NULL, 0, NULL);
825
826     if (u->source_output_unlink_post_slot)
827         pa_hook_slot_free(u->source_output_unlink_post_slot);
828
829     if (u->source_output_fixate_slot)
830         pa_hook_slot_free(u->source_output_fixate_slot);
831
832     if (u->source_output_put_slot)
833         pa_hook_slot_free(u->source_output_put_slot);
834
835     if (u->asyncmsgq_sink)
836         pa_asyncmsgq_unref(u->asyncmsgq_sink);
837
838     if (u->asyncmsgq_source)
839         pa_asyncmsgq_unref(u->asyncmsgq_source);
840
841     if (u->rtpoll)
842         pa_rtpoll_free(u->rtpoll);
843
844     pa_thread_mq_done(&u->thread_mq);
845
846     pa_xfree(u);
847 }
848