echo-cance: Make Adrian canceller optional
[profile/ivi/pulseaudio.git] / src / modules / echo-cancel / module-echo-cancel.c
1 /***
2     This file is part of PulseAudio.
3
4     Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6     Based on module-virtual-sink.c
7              module-virtual-source.c
8              module-loopback.c
9
10         Copyright 2010 Intel Corporation
11         Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13     PulseAudio is free software; you can redistribute it and/or modify
14     it under the terms of the GNU Lesser General Public License as published
15     by the Free Software Foundation; either version 2.1 of the License,
16     or (at your option) any later version.
17
18     PulseAudio is distributed in the hope that it will be useful, but
19     WITHOUT ANY WARRANTY; without even the implied warranty of
20     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21     General Public License for more details.
22
23     You should have received a copy of the GNU Lesser General Public License
24     along with PulseAudio; if not, write to the Free Software
25     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26     USA.
27 ***/
28
29 #ifdef HAVE_CONFIG_H
30 #include <config.h>
31 #endif
32
33 #include <stdio.h>
34 #include <math.h>
35
36 #include "echo-cancel.h"
37
38 #include <pulse/xmalloc.h>
39 #include <pulse/timeval.h>
40 #include <pulse/rtclock.h>
41
42 #include <pulsecore/i18n.h>
43 #include <pulsecore/atomic.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/namereg.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-rtclock.h>
49 #include <pulsecore/core-util.h>
50 #include <pulsecore/modargs.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/rtpoll.h>
53 #include <pulsecore/sample-util.h>
54 #include <pulsecore/ltdl-helper.h>
55
56 #include "module-echo-cancel-symdef.h"
57
58 PA_MODULE_AUTHOR("Wim Taymans");
59 PA_MODULE_DESCRIPTION("Echo Cancellation");
60 PA_MODULE_VERSION(PACKAGE_VERSION);
61 PA_MODULE_LOAD_ONCE(FALSE);
62 PA_MODULE_USAGE(
63         _("source_name=<name for the source> "
64           "source_properties=<properties for the source> "
65           "source_master=<name of source to filter> "
66           "sink_name=<name for the sink> "
67           "sink_properties=<properties for the sink> "
68           "sink_master=<name of sink to filter> "
69           "adjust_time=<how often to readjust rates in s> "
70           "adjust_threshold=<how much drift to readjust after in ms> "
71           "format=<sample format> "
72           "rate=<sample rate> "
73           "channels=<number of channels> "
74           "channel_map=<channel map> "
75           "aec_method=<implementation to use> "
76           "aec_args=<parameters for the AEC engine> "
77           "save_aec=<save AEC data in /tmp> "
78           "autoloaded=<set if this module is being loaded automatically> "
79           "use_volume_sharing=<yes or no> "
80         ));
81
82 /* NOTE: Make sure the enum and ec_table are maintained in the correct order */
83 typedef enum {
84     PA_ECHO_CANCELLER_INVALID = -1,
85 #ifdef HAVE_SPEEX
86     PA_ECHO_CANCELLER_SPEEX,
87 #endif
88 #ifdef HAVE_ADRIAN_EC
89     PA_ECHO_CANCELLER_ADRIAN,
90 #endif
91 #ifdef HAVE_WEBRTC
92     PA_ECHO_CANCELLER_WEBRTC,
93 #endif
94 } pa_echo_canceller_method_t;
95
96 #ifdef HAVE_WEBRTC
97 #define DEFAULT_ECHO_CANCELLER "webrtc"
98 #else
99 #define DEFAULT_ECHO_CANCELLER "speex"
100 #endif
101
102 static const pa_echo_canceller ec_table[] = {
103 #ifdef HAVE_SPEEX
104     {
105         /* Speex */
106         .init                   = pa_speex_ec_init,
107         .run                    = pa_speex_ec_run,
108         .done                   = pa_speex_ec_done,
109     },
110 #endif
111 #ifdef HAVE_ADRIAN_EC
112     {
113         /* Adrian Andre's NLMS implementation */
114         .init                   = pa_adrian_ec_init,
115         .run                    = pa_adrian_ec_run,
116         .done                   = pa_adrian_ec_done,
117     },
118 #endif
119 #ifdef HAVE_WEBRTC
120     {
121         /* WebRTC's audio processing engine */
122         .init                   = pa_webrtc_ec_init,
123         .play                   = pa_webrtc_ec_play,
124         .record                 = pa_webrtc_ec_record,
125         .set_drift              = pa_webrtc_ec_set_drift,
126         .run                    = pa_webrtc_ec_run,
127         .done                   = pa_webrtc_ec_done,
128     },
129 #endif
130 };
131
132 #define DEFAULT_RATE 32000
133 #define DEFAULT_CHANNELS 1
134 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
135 #define DEFAULT_ADJUST_TOLERANCE (5*PA_USEC_PER_MSEC)
136 #define DEFAULT_SAVE_AEC FALSE
137 #define DEFAULT_AUTOLOADED FALSE
138
139 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
140
141 /* Can only be used in main context */
142 #define IS_ACTIVE(u) ((pa_source_get_state((u)->source) == PA_SOURCE_RUNNING) && \
143                       (pa_sink_get_state((u)->sink) == PA_SINK_RUNNING))
144
145 /* This module creates a new (virtual) source and sink.
146  *
147  * The data sent to the new sink is kept in a memblockq before being
148  * forwarded to the real sink_master.
149  *
150  * Data read from source_master is matched against the saved sink data and
151  * echo canceled data is then pushed onto the new source.
152  *
153  * Both source and sink masters have their own threads to push/pull data
154  * respectively. We however perform all our actions in the source IO thread.
155  * To do this we send all played samples to the source IO thread where they
156  * are then pushed into the memblockq.
157  *
158  * Alignment is performed in two steps:
159  *
160  * 1) when something happens that requires quick adjustment of the alignment of
161  *    capture and playback samples, we perform a resync. This adjusts the
162  *    position in the playback memblock to the requested sample. Quick
163  *    adjustments include moving the playback samples before the capture
164  *    samples (because else the echo canceler does not work) or when the
165  *    playback pointer drifts too far away.
166  *
167  * 2) periodically check the difference between capture and playback. we use a
168  *    low and high watermark for adjusting the alignment. playback should always
169  *    be before capture and the difference should not be bigger than one frame
170  *    size. We would ideally like to resample the sink_input but most driver
171  *    don't give enough accuracy to be able to do that right now.
172  */
173
174 struct userdata;
175
176 struct pa_echo_canceller_msg {
177     pa_msgobject parent;
178     struct userdata *userdata;
179 };
180
181 PA_DEFINE_PRIVATE_CLASS(pa_echo_canceller_msg, pa_msgobject);
182 #define PA_ECHO_CANCELLER_MSG(o) (pa_echo_canceller_msg_cast(o))
183
184 struct snapshot {
185     pa_usec_t sink_now;
186     pa_usec_t sink_latency;
187     size_t sink_delay;
188     int64_t send_counter;
189
190     pa_usec_t source_now;
191     pa_usec_t source_latency;
192     size_t source_delay;
193     int64_t recv_counter;
194     size_t rlen;
195     size_t plen;
196 };
197
198 struct userdata {
199     pa_core *core;
200     pa_module *module;
201
202     pa_bool_t autoloaded;
203     pa_bool_t dead;
204     pa_bool_t save_aec;
205
206     pa_echo_canceller *ec;
207     uint32_t blocksize;
208
209     pa_bool_t need_realign;
210
211     /* to wakeup the source I/O thread */
212     pa_asyncmsgq *asyncmsgq;
213     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
214
215     pa_source *source;
216     pa_bool_t source_auto_desc;
217     pa_source_output *source_output;
218     pa_memblockq *source_memblockq; /* echo canceler needs fixed sized chunks */
219     size_t source_skip;
220
221     pa_sink *sink;
222     pa_bool_t sink_auto_desc;
223     pa_sink_input *sink_input;
224     pa_memblockq *sink_memblockq;
225     int64_t send_counter;          /* updated in sink IO thread */
226     int64_t recv_counter;
227     size_t sink_skip;
228
229     /* Bytes left over from previous iteration */
230     size_t sink_rem;
231     size_t source_rem;
232
233     pa_atomic_t request_resync;
234
235     pa_time_event *time_event;
236     pa_usec_t adjust_time;
237     int adjust_threshold;
238
239     FILE *captured_file;
240     FILE *played_file;
241     FILE *canceled_file;
242     FILE *drift_file;
243
244     pa_bool_t use_volume_sharing;
245
246     struct {
247         pa_cvolume current_volume;
248     } thread_info;
249 };
250
251 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
252
253 static const char* const valid_modargs[] = {
254     "source_name",
255     "source_properties",
256     "source_master",
257     "sink_name",
258     "sink_properties",
259     "sink_master",
260     "adjust_time",
261     "adjust_threshold",
262     "format",
263     "rate",
264     "channels",
265     "channel_map",
266     "aec_method",
267     "aec_args",
268     "save_aec",
269     "autoloaded",
270     "use_volume_sharing",
271     NULL
272 };
273
274 enum {
275     SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
276     SOURCE_OUTPUT_MESSAGE_REWIND,
277     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
278     SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
279 };
280
281 enum {
282     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
283 };
284
285 enum {
286     ECHO_CANCELLER_MESSAGE_SET_VOLUME,
287 };
288
289 static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
290     int64_t buffer, diff_time, buffer_latency;
291
292     /* get the number of samples between capture and playback */
293     if (snapshot->plen > snapshot->rlen)
294         buffer = snapshot->plen - snapshot->rlen;
295     else
296         buffer = 0;
297
298     buffer += snapshot->source_delay + snapshot->sink_delay;
299
300     /* add the amount of samples not yet transferred to the source context */
301     if (snapshot->recv_counter <= snapshot->send_counter)
302         buffer += (int64_t) (snapshot->send_counter - snapshot->recv_counter);
303     else
304         buffer += PA_CLIP_SUB(buffer, (int64_t) (snapshot->recv_counter - snapshot->send_counter));
305
306     /* convert to time */
307     buffer_latency = pa_bytes_to_usec(buffer, &u->source_output->sample_spec);
308
309     /* capture and playback samples are perfectly aligned when diff_time is 0 */
310     diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
311           (snapshot->source_now - snapshot->source_latency);
312
313     pa_log_debug("diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
314         (long long) snapshot->sink_latency,
315         (long long) buffer_latency, (long long) snapshot->source_latency,
316         (long long) snapshot->source_delay, (long long) snapshot->sink_delay,
317         (long long) (snapshot->send_counter - snapshot->recv_counter),
318         (long long) (snapshot->sink_now - snapshot->source_now));
319
320     return diff_time;
321 }
322
323 /* Called from main context */
324 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
325     struct userdata *u = userdata;
326     uint32_t old_rate, base_rate, new_rate;
327     int64_t diff_time;
328     /*size_t fs*/
329     struct snapshot latency_snapshot;
330
331     pa_assert(u);
332     pa_assert(a);
333     pa_assert(u->time_event == e);
334     pa_assert_ctl_context();
335
336     if (!IS_ACTIVE(u))
337         return;
338
339     /* update our snapshots */
340     pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
341     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
342
343     /* calculate drift between capture and playback */
344     diff_time = calc_diff(u, &latency_snapshot);
345
346     /*fs = pa_frame_size(&u->source_output->sample_spec);*/
347     old_rate = u->sink_input->sample_spec.rate;
348     base_rate = u->source_output->sample_spec.rate;
349
350     if (diff_time < 0) {
351         /* recording before playback, we need to adjust quickly. The echo
352          * canceler does not work in this case. */
353         pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
354             NULL, diff_time, NULL, NULL);
355         /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
356         new_rate = base_rate;
357     }
358     else {
359         if (diff_time > u->adjust_threshold) {
360             /* diff too big, quickly adjust */
361             pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
362                 NULL, diff_time, NULL, NULL);
363         }
364
365         /* recording behind playback, we need to slowly adjust the rate to match */
366         /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
367
368         /* assume equal samplerates for now */
369         new_rate = base_rate;
370     }
371
372     /* make sure we don't make too big adjustments because that sounds horrible */
373     if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
374         new_rate = base_rate;
375
376     if (new_rate != old_rate) {
377         pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
378
379         pa_sink_input_set_rate(u->sink_input, new_rate);
380     }
381
382     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
383 }
384
385 /* Called from source I/O thread context */
386 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
387     struct userdata *u = PA_SOURCE(o)->userdata;
388
389     switch (code) {
390
391         case PA_SOURCE_MESSAGE_GET_LATENCY:
392
393             /* The source is _put() before the source output is, so let's
394              * make sure we don't access it in that time. Also, the
395              * source output is first shut down, the source second. */
396             if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
397                 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
398                 *((pa_usec_t*) data) = 0;
399                 return 0;
400             }
401
402             *((pa_usec_t*) data) =
403
404                 /* Get the latency of the master source */
405                 pa_source_get_latency_within_thread(u->source_output->source) +
406                 /* Add the latency internal to our source output on top */
407                 pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
408                 /* and the buffering we do on the source */
409                 pa_bytes_to_usec(u->blocksize, &u->source_output->source->sample_spec);
410
411             return 0;
412
413         case PA_SOURCE_MESSAGE_SET_VOLUME_SYNCED:
414             u->thread_info.current_volume = u->source->reference_volume;
415             break;
416     }
417
418     return pa_source_process_msg(o, code, data, offset, chunk);
419 }
420
421 /* Called from sink I/O thread context */
422 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
423     struct userdata *u = PA_SINK(o)->userdata;
424
425     switch (code) {
426
427         case PA_SINK_MESSAGE_GET_LATENCY:
428
429             /* The sink is _put() before the sink input is, so let's
430              * make sure we don't access it in that time. Also, the
431              * sink input is first shut down, the sink second. */
432             if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
433                 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
434                 *((pa_usec_t*) data) = 0;
435                 return 0;
436             }
437
438             *((pa_usec_t*) data) =
439
440                 /* Get the latency of the master sink */
441                 pa_sink_get_latency_within_thread(u->sink_input->sink) +
442
443                 /* Add the latency internal to our sink input on top */
444                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
445
446             return 0;
447     }
448
449     return pa_sink_process_msg(o, code, data, offset, chunk);
450 }
451
452
453 /* Called from main context */
454 static int source_set_state_cb(pa_source *s, pa_source_state_t state) {
455     struct userdata *u;
456
457     pa_source_assert_ref(s);
458     pa_assert_se(u = s->userdata);
459
460     if (!PA_SOURCE_IS_LINKED(state) ||
461         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
462         return 0;
463
464     if (state == PA_SOURCE_RUNNING) {
465         /* restart timer when both sink and source are active */
466         if (IS_ACTIVE(u) && u->adjust_time)
467             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
468
469         pa_atomic_store(&u->request_resync, 1);
470         pa_source_output_cork(u->source_output, FALSE);
471     } else if (state == PA_SOURCE_SUSPENDED) {
472         pa_source_output_cork(u->source_output, TRUE);
473     }
474
475     return 0;
476 }
477
478 /* Called from main context */
479 static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state) {
480     struct userdata *u;
481
482     pa_sink_assert_ref(s);
483     pa_assert_se(u = s->userdata);
484
485     if (!PA_SINK_IS_LINKED(state) ||
486         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
487         return 0;
488
489     if (state == PA_SINK_RUNNING) {
490         /* restart timer when both sink and source are active */
491         if (IS_ACTIVE(u) && u->adjust_time)
492             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
493
494         pa_atomic_store(&u->request_resync, 1);
495         pa_sink_input_cork(u->sink_input, FALSE);
496     } else if (state == PA_SINK_SUSPENDED) {
497         pa_sink_input_cork(u->sink_input, TRUE);
498     }
499
500     return 0;
501 }
502
503 /* Called from I/O thread context */
504 static void source_update_requested_latency_cb(pa_source *s) {
505     struct userdata *u;
506
507     pa_source_assert_ref(s);
508     pa_assert_se(u = s->userdata);
509
510     if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
511         !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
512         return;
513
514     pa_log_debug("Source update requested latency");
515
516     /* Just hand this one over to the master source */
517     pa_source_output_set_requested_latency_within_thread(
518             u->source_output,
519             pa_source_get_requested_latency_within_thread(s));
520 }
521
522 /* Called from I/O thread context */
523 static void sink_update_requested_latency_cb(pa_sink *s) {
524     struct userdata *u;
525
526     pa_sink_assert_ref(s);
527     pa_assert_se(u = s->userdata);
528
529     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
530         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
531         return;
532
533     pa_log_debug("Sink update requested latency");
534
535     /* Just hand this one over to the master sink */
536     pa_sink_input_set_requested_latency_within_thread(
537             u->sink_input,
538             pa_sink_get_requested_latency_within_thread(s));
539 }
540
541 /* Called from I/O thread context */
542 static void sink_request_rewind_cb(pa_sink *s) {
543     struct userdata *u;
544
545     pa_sink_assert_ref(s);
546     pa_assert_se(u = s->userdata);
547
548     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
549         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
550         return;
551
552     pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
553
554     /* Just hand this one over to the master sink */
555     pa_sink_input_request_rewind(u->sink_input,
556                                  s->thread_info.rewind_nbytes, TRUE, FALSE, FALSE);
557 }
558
559 /* Called from main context */
560 static void source_set_volume_cb(pa_source *s) {
561     struct userdata *u;
562
563     pa_source_assert_ref(s);
564     pa_assert_se(u = s->userdata);
565
566     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
567         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
568         return;
569
570     pa_source_output_set_volume(u->source_output, &s->real_volume, s->save_volume, TRUE);
571 }
572
573 /* Called from main context */
574 static void sink_set_volume_cb(pa_sink *s) {
575     struct userdata *u;
576
577     pa_sink_assert_ref(s);
578     pa_assert_se(u = s->userdata);
579
580     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
581         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
582         return;
583
584     pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, TRUE);
585 }
586
587 static void source_get_volume_cb(pa_source *s) {
588     struct userdata *u;
589     pa_cvolume v;
590
591     pa_source_assert_ref(s);
592     pa_assert_se(u = s->userdata);
593
594     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
595         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
596         return;
597
598     pa_source_output_get_volume(u->source_output, &v, TRUE);
599
600     if (pa_cvolume_equal(&s->real_volume, &v))
601         /* no change */
602         return;
603
604     s->real_volume = v;
605     pa_source_set_soft_volume(s, NULL);
606 }
607
608 /* Called from main context */
609 static void source_set_mute_cb(pa_source *s) {
610     struct userdata *u;
611
612     pa_source_assert_ref(s);
613     pa_assert_se(u = s->userdata);
614
615     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
616         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
617         return;
618
619     pa_source_output_set_mute(u->source_output, s->muted, s->save_muted);
620 }
621
622 /* Called from main context */
623 static void sink_set_mute_cb(pa_sink *s) {
624     struct userdata *u;
625
626     pa_sink_assert_ref(s);
627     pa_assert_se(u = s->userdata);
628
629     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
630         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
631         return;
632
633     pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
634 }
635
636 /* Called from main context */
637 static void source_get_mute_cb(pa_source *s) {
638     struct userdata *u;
639
640     pa_source_assert_ref(s);
641     pa_assert_se(u = s->userdata);
642
643     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
644         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
645         return;
646
647     pa_source_output_get_mute(u->source_output);
648 }
649
650 /* must be called from the input thread context */
651 static void apply_diff_time(struct userdata *u, int64_t diff_time) {
652     int64_t diff;
653
654     if (diff_time < 0) {
655         diff = pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec);
656
657         if (diff > 0) {
658             /* add some extra safety samples to compensate for jitter in the
659              * timings */
660             diff += 10 * pa_frame_size (&u->source_output->sample_spec);
661
662             pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
663
664             u->sink_skip = diff;
665             u->source_skip = 0;
666         }
667     } else if (diff_time > 0) {
668         diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
669
670         if (diff > 0) {
671             pa_log("playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
672
673             u->source_skip = diff;
674             u->sink_skip = 0;
675         }
676     }
677 }
678
679 /* must be called from the input thread */
680 static void do_resync(struct userdata *u) {
681     int64_t diff_time;
682     struct snapshot latency_snapshot;
683
684     pa_log("Doing resync");
685
686     /* update our snapshot */
687     source_output_snapshot_within_thread(u, &latency_snapshot);
688     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
689
690     /* calculate drift between capture and playback */
691     diff_time = calc_diff(u, &latency_snapshot);
692
693     /* and adjust for the drift */
694     apply_diff_time(u, diff_time);
695 }
696
697 /* 1. Calculate drift at this point, pass to canceller
698  * 2. Push out playback samples in blocksize chunks
699  * 3. Push out capture samples in blocksize chunks
700  * 4. ???
701  * 5. Profit
702  */
703 static void do_push_drift_comp(struct userdata *u) {
704     size_t rlen, plen;
705     pa_memchunk rchunk, pchunk, cchunk;
706     uint8_t *rdata, *pdata, *cdata;
707     float drift;
708     int unused PA_GCC_UNUSED;
709
710     rlen = pa_memblockq_get_length(u->source_memblockq);
711     plen = pa_memblockq_get_length(u->sink_memblockq);
712
713     /* Estimate snapshot drift as follows:
714      *   pd: amount of data consumed since last time
715      *   rd: amount of data consumed since last time
716      *
717      *   drift = (pd - rd) / rd;
718      *
719      * We calculate pd and rd as the memblockq length less the number of
720      * samples left from the last iteration (to avoid double counting
721      * those remainder samples.
722      */
723     drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
724     u->sink_rem = plen % u->blocksize;
725     u->source_rem = rlen % u->blocksize;
726
727     /* Now let the canceller work its drift compensation magic */
728     u->ec->set_drift(u->ec, drift);
729
730     if (u->save_aec) {
731         if (u->drift_file)
732             fprintf(u->drift_file, "d %a\n", drift);
733     }
734
735     /* Send in the playback samples first */
736     while (plen >= u->blocksize) {
737         pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
738         pdata = pa_memblock_acquire(pchunk.memblock);
739         pdata += pchunk.index;
740
741         u->ec->play(u->ec, pdata);
742
743         if (u->save_aec) {
744             if (u->drift_file)
745                 fprintf(u->drift_file, "p %d\n", u->blocksize);
746             if (u->played_file)
747                 unused = fwrite(pdata, 1, u->blocksize, u->played_file);
748         }
749
750         pa_memblock_release(pchunk.memblock);
751         pa_memblockq_drop(u->sink_memblockq, u->blocksize);
752         pa_memblock_unref(pchunk.memblock);
753
754         plen -= u->blocksize;
755     }
756
757     /* And now the capture samples */
758     while (rlen >= u->blocksize) {
759         pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
760
761         rdata = pa_memblock_acquire(rchunk.memblock);
762         rdata += rchunk.index;
763
764         cchunk.index = 0;
765         cchunk.length = u->blocksize;
766         cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
767         cdata = pa_memblock_acquire(cchunk.memblock);
768
769         u->ec->record(u->ec, rdata, cdata);
770
771         if (u->save_aec) {
772             if (u->drift_file)
773                 fprintf(u->drift_file, "c %d\n", u->blocksize);
774             if (u->captured_file)
775                 unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
776             if (u->canceled_file)
777                 unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
778         }
779
780         pa_memblock_release(cchunk.memblock);
781         pa_memblock_release(rchunk.memblock);
782
783         pa_memblock_unref(rchunk.memblock);
784
785         pa_source_post(u->source, &cchunk);
786         pa_memblock_unref(cchunk.memblock);
787
788         pa_memblockq_drop(u->source_memblockq, u->blocksize);
789         rlen -= u->blocksize;
790     }
791 }
792
793 /* This one's simpler than the drift compensation case -- we just iterate over
794  * the capture buffer, and pass the canceller blocksize bytes of playback and
795  * capture data. */
796 static void do_push(struct userdata *u) {
797     size_t rlen, plen;
798     pa_memchunk rchunk, pchunk, cchunk;
799     uint8_t *rdata, *pdata, *cdata;
800     int unused PA_GCC_UNUSED;
801
802     rlen = pa_memblockq_get_length(u->source_memblockq);
803     plen = pa_memblockq_get_length(u->sink_memblockq);
804
805     while (rlen >= u->blocksize) {
806         /* take fixed block from recorded samples */
807         pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
808
809         if (plen > u->blocksize) {
810             if (plen > u->blocksize) {
811                 /* take fixed block from played samples */
812                 pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
813
814                 rdata = pa_memblock_acquire(rchunk.memblock);
815                 rdata += rchunk.index;
816                 pdata = pa_memblock_acquire(pchunk.memblock);
817                 pdata += pchunk.index;
818
819                 cchunk.index = 0;
820                 cchunk.length = u->blocksize;
821                 cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
822                 cdata = pa_memblock_acquire(cchunk.memblock);
823
824                 if (u->save_aec) {
825                     if (u->captured_file)
826                         unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
827                     if (u->played_file)
828                         unused = fwrite(pdata, 1, u->blocksize, u->played_file);
829                 }
830
831                 /* perform echo cancellation */
832                 u->ec->run(u->ec, rdata, pdata, cdata);
833
834                 if (u->save_aec) {
835                     if (u->canceled_file)
836                         unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
837                 }
838
839                 pa_memblock_release(cchunk.memblock);
840                 pa_memblock_release(pchunk.memblock);
841                 pa_memblock_release(rchunk.memblock);
842
843                 /* drop consumed sink samples */
844                 pa_memblockq_drop(u->sink_memblockq, u->blocksize);
845                 pa_memblock_unref(pchunk.memblock);
846
847                 pa_memblock_unref(rchunk.memblock);
848                 /* the filtered samples now become the samples from our
849                  * source */
850                 rchunk = cchunk;
851
852                 plen -= u->blocksize;
853             }
854         }
855
856         /* forward the (echo-canceled) data to the virtual source */
857         pa_source_post(u->source, &rchunk);
858         pa_memblock_unref(rchunk.memblock);
859
860         pa_memblockq_drop(u->source_memblockq, u->blocksize);
861         rlen -= u->blocksize;
862     }
863 }
864
865 /* Called from input thread context */
866 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
867     struct userdata *u;
868     size_t rlen, plen, to_skip;
869     pa_memchunk rchunk;
870
871     pa_source_output_assert_ref(o);
872     pa_source_output_assert_io_context(o);
873     pa_assert_se(u = o->userdata);
874
875     if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
876         pa_log("push when no link?");
877         return;
878     }
879
880     if (PA_UNLIKELY(u->source->thread_info.state != PA_SOURCE_RUNNING ||
881                     u->sink->thread_info.state != PA_SINK_RUNNING)) {
882         pa_source_post(u->source, chunk);
883         return;
884     }
885
886     /* handle queued messages, do any message sending of our own */
887     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
888         ;
889
890     pa_memblockq_push_align(u->source_memblockq, chunk);
891
892     rlen = pa_memblockq_get_length(u->source_memblockq);
893     plen = pa_memblockq_get_length(u->sink_memblockq);
894
895     /* Let's not do anything else till we have enough data to process */
896     if (rlen < u->blocksize)
897         return;
898
899     /* See if we need to drop samples in order to sync */
900     if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
901         do_resync(u);
902     }
903
904     /* Okay, skip cancellation for skipped source samples if needed. */
905     if (PA_UNLIKELY(u->source_skip)) {
906         /* The slightly tricky bit here is that we drop all but modulo
907          * blocksize bytes and then adjust for that last bit on the sink side.
908          * We do this because the source data is coming at a fixed rate, which
909          * means the only way to try to catch up is drop sink samples and let
910          * the canceller cope up with this. */
911         to_skip = rlen >= u->source_skip ? u->source_skip : rlen;
912         to_skip -= to_skip % u->blocksize;
913
914         if (to_skip) {
915             pa_memblockq_peek_fixed_size(u->source_memblockq, to_skip, &rchunk);
916             pa_source_post(u->source, &rchunk);
917
918             pa_memblock_unref(rchunk.memblock);
919             pa_memblockq_drop(u->source_memblockq, u->blocksize);
920
921             rlen -= to_skip;
922             u->source_skip -= to_skip;
923         }
924
925         if (rlen && u->source_skip % u->blocksize) {
926             u->sink_skip += u->blocksize - (u->source_skip % u->blocksize);
927             u->source_skip -= (u->source_skip % u->blocksize);
928         }
929     }
930
931     /* And for the sink, these samples have been played back already, so we can
932      * just drop them and get on with it. */
933     if (PA_UNLIKELY(u->sink_skip)) {
934         to_skip = plen >= u->sink_skip ? u->sink_skip : plen;
935
936         pa_memblockq_drop(u->sink_memblockq, to_skip);
937
938         plen -= to_skip;
939         u->sink_skip -= to_skip;
940     }
941
942     /* process and push out samples */
943     if (u->ec->params.drift_compensation)
944         do_push_drift_comp(u);
945     else
946         do_push(u);
947 }
948
949 /* Called from I/O thread context */
950 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
951     struct userdata *u;
952
953     pa_sink_input_assert_ref(i);
954     pa_assert(chunk);
955     pa_assert_se(u = i->userdata);
956
957     if (u->sink->thread_info.rewind_requested)
958         pa_sink_process_rewind(u->sink, 0);
959
960     pa_sink_render_full(u->sink, nbytes, chunk);
961
962     if (i->thread_info.underrun_for > 0) {
963         pa_log_debug("Handling end of underrun.");
964         pa_atomic_store(&u->request_resync, 1);
965     }
966
967     /* let source thread handle the chunk. pass the sample count as well so that
968      * the source IO thread can update the right variables. */
969     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
970         NULL, 0, chunk, NULL);
971     u->send_counter += chunk->length;
972
973     return 0;
974 }
975
976 /* Called from input thread context */
977 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
978     struct userdata *u;
979
980     pa_source_output_assert_ref(o);
981     pa_source_output_assert_io_context(o);
982     pa_assert_se(u = o->userdata);
983
984     pa_source_process_rewind(u->source, nbytes);
985
986     /* go back on read side, we need to use older sink data for this */
987     pa_memblockq_rewind(u->sink_memblockq, nbytes);
988
989     /* manipulate write index */
990     pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, TRUE);
991
992     pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
993         (long long) pa_memblockq_get_length (u->source_memblockq));
994 }
995
996 /* Called from I/O thread context */
997 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
998     struct userdata *u;
999
1000     pa_sink_input_assert_ref(i);
1001     pa_assert_se(u = i->userdata);
1002
1003     pa_log_debug("Sink process rewind %lld", (long long) nbytes);
1004
1005     pa_sink_process_rewind(u->sink, nbytes);
1006
1007     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
1008     u->send_counter -= nbytes;
1009 }
1010
1011 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
1012     size_t delay, rlen, plen;
1013     pa_usec_t now, latency;
1014
1015     now = pa_rtclock_now();
1016     latency = pa_source_get_latency_within_thread(u->source_output->source);
1017     delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
1018
1019     delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
1020     rlen = pa_memblockq_get_length(u->source_memblockq);
1021     plen = pa_memblockq_get_length(u->sink_memblockq);
1022
1023     snapshot->source_now = now;
1024     snapshot->source_latency = latency;
1025     snapshot->source_delay = delay;
1026     snapshot->recv_counter = u->recv_counter;
1027     snapshot->rlen = rlen + u->sink_skip;
1028     snapshot->plen = plen + u->source_skip;
1029 }
1030
1031
1032 /* Called from output thread context */
1033 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1034     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
1035
1036     switch (code) {
1037
1038         case SOURCE_OUTPUT_MESSAGE_POST:
1039
1040             pa_source_output_assert_io_context(u->source_output);
1041
1042             if (u->source_output->source->thread_info.state == PA_SOURCE_RUNNING)
1043                 pa_memblockq_push_align(u->sink_memblockq, chunk);
1044             else
1045                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1046
1047             u->recv_counter += (int64_t) chunk->length;
1048
1049             return 0;
1050
1051         case SOURCE_OUTPUT_MESSAGE_REWIND:
1052             pa_source_output_assert_io_context(u->source_output);
1053
1054             /* manipulate write index, never go past what we have */
1055             if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
1056                 pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, TRUE);
1057             else
1058                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1059
1060             pa_log_debug("Sink rewind (%lld)", (long long) offset);
1061
1062             u->recv_counter -= offset;
1063
1064             return 0;
1065
1066         case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
1067             struct snapshot *snapshot = (struct snapshot *) data;
1068
1069             source_output_snapshot_within_thread(u, snapshot);
1070             return 0;
1071         }
1072
1073         case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
1074             apply_diff_time(u, offset);
1075             return 0;
1076
1077     }
1078
1079     return pa_source_output_process_msg(obj, code, data, offset, chunk);
1080 }
1081
1082 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1083     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1084
1085     switch (code) {
1086
1087         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1088             size_t delay;
1089             pa_usec_t now, latency;
1090             struct snapshot *snapshot = (struct snapshot *) data;
1091
1092             pa_sink_input_assert_io_context(u->sink_input);
1093
1094             now = pa_rtclock_now();
1095             latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
1096             delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1097
1098             delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
1099
1100             snapshot->sink_now = now;
1101             snapshot->sink_latency = latency;
1102             snapshot->sink_delay = delay;
1103             snapshot->send_counter = u->send_counter;
1104             return 0;
1105         }
1106     }
1107
1108     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1109 }
1110
1111 /* Called from I/O thread context */
1112 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1113     struct userdata *u;
1114
1115     pa_sink_input_assert_ref(i);
1116     pa_assert_se(u = i->userdata);
1117
1118     pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
1119
1120     pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
1121     pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
1122 }
1123
1124 /* Called from I/O thread context */
1125 static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
1126     struct userdata *u;
1127
1128     pa_source_output_assert_ref(o);
1129     pa_assert_se(u = o->userdata);
1130
1131     pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
1132
1133     pa_source_set_max_rewind_within_thread(u->source, nbytes);
1134 }
1135
1136 /* Called from I/O thread context */
1137 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1138     struct userdata *u;
1139
1140     pa_sink_input_assert_ref(i);
1141     pa_assert_se(u = i->userdata);
1142
1143     pa_log_debug("Sink input update max request %lld", (long long) nbytes);
1144
1145     pa_sink_set_max_request_within_thread(u->sink, nbytes);
1146 }
1147
1148 /* Called from I/O thread context */
1149 static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
1150     struct userdata *u;
1151     pa_usec_t latency;
1152
1153     pa_sink_input_assert_ref(i);
1154     pa_assert_se(u = i->userdata);
1155
1156     latency = pa_sink_get_requested_latency_within_thread(i->sink);
1157
1158     pa_log_debug("Sink input update requested latency %lld", (long long) latency);
1159 }
1160
1161 /* Called from I/O thread context */
1162 static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
1163     struct userdata *u;
1164     pa_usec_t latency;
1165
1166     pa_source_output_assert_ref(o);
1167     pa_assert_se(u = o->userdata);
1168
1169     latency = pa_source_get_requested_latency_within_thread(o->source);
1170
1171     pa_log_debug("source output update requested latency %lld", (long long) latency);
1172 }
1173
1174 /* Called from I/O thread context */
1175 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
1176     struct userdata *u;
1177
1178     pa_sink_input_assert_ref(i);
1179     pa_assert_se(u = i->userdata);
1180
1181     pa_log_debug("Sink input update latency range %lld %lld",
1182         (long long) i->sink->thread_info.min_latency,
1183         (long long) i->sink->thread_info.max_latency);
1184
1185     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1186 }
1187
1188 /* Called from I/O thread context */
1189 static void source_output_update_source_latency_range_cb(pa_source_output *o) {
1190     struct userdata *u;
1191
1192     pa_source_output_assert_ref(o);
1193     pa_assert_se(u = o->userdata);
1194
1195     pa_log_debug("Source output update latency range %lld %lld",
1196         (long long) o->source->thread_info.min_latency,
1197         (long long) o->source->thread_info.max_latency);
1198
1199     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1200 }
1201
1202 /* Called from I/O thread context */
1203 static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1204     struct userdata *u;
1205
1206     pa_sink_input_assert_ref(i);
1207     pa_assert_se(u = i->userdata);
1208
1209     pa_log_debug("Sink input update fixed latency %lld",
1210         (long long) i->sink->thread_info.fixed_latency);
1211
1212     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1213 }
1214
1215 /* Called from I/O thread context */
1216 static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1217     struct userdata *u;
1218
1219     pa_source_output_assert_ref(o);
1220     pa_assert_se(u = o->userdata);
1221
1222     pa_log_debug("Source output update fixed latency %lld",
1223         (long long) o->source->thread_info.fixed_latency);
1224
1225     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1226 }
1227
1228 /* Called from output thread context */
1229 static void source_output_attach_cb(pa_source_output *o) {
1230     struct userdata *u;
1231
1232     pa_source_output_assert_ref(o);
1233     pa_source_output_assert_io_context(o);
1234     pa_assert_se(u = o->userdata);
1235
1236     pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1237     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1238     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1239     pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1240
1241     pa_log_debug("Source output %d attach", o->index);
1242
1243     pa_source_attach_within_thread(u->source);
1244
1245     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1246             o->source->thread_info.rtpoll,
1247             PA_RTPOLL_LATE,
1248             u->asyncmsgq);
1249 }
1250
1251 /* Called from I/O thread context */
1252 static void sink_input_attach_cb(pa_sink_input *i) {
1253     struct userdata *u;
1254
1255     pa_sink_input_assert_ref(i);
1256     pa_assert_se(u = i->userdata);
1257
1258     pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1259     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1260
1261     /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1262      * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1263     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1264
1265     /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1266      * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1267      * HERE. SEE (6) */
1268     pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1269     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1270
1271     pa_log_debug("Sink input %d attach", i->index);
1272
1273     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1274             i->sink->thread_info.rtpoll,
1275             PA_RTPOLL_LATE,
1276             u->asyncmsgq);
1277
1278     pa_sink_attach_within_thread(u->sink);
1279 }
1280
1281
1282 /* Called from output thread context */
1283 static void source_output_detach_cb(pa_source_output *o) {
1284     struct userdata *u;
1285
1286     pa_source_output_assert_ref(o);
1287     pa_source_output_assert_io_context(o);
1288     pa_assert_se(u = o->userdata);
1289
1290     pa_source_detach_within_thread(u->source);
1291     pa_source_set_rtpoll(u->source, NULL);
1292
1293     pa_log_debug("Source output %d detach", o->index);
1294
1295     if (u->rtpoll_item_read) {
1296         pa_rtpoll_item_free(u->rtpoll_item_read);
1297         u->rtpoll_item_read = NULL;
1298     }
1299 }
1300
1301 /* Called from I/O thread context */
1302 static void sink_input_detach_cb(pa_sink_input *i) {
1303     struct userdata *u;
1304
1305     pa_sink_input_assert_ref(i);
1306     pa_assert_se(u = i->userdata);
1307
1308     pa_sink_detach_within_thread(u->sink);
1309
1310     pa_sink_set_rtpoll(u->sink, NULL);
1311
1312     pa_log_debug("Sink input %d detach", i->index);
1313
1314     if (u->rtpoll_item_write) {
1315         pa_rtpoll_item_free(u->rtpoll_item_write);
1316         u->rtpoll_item_write = NULL;
1317     }
1318 }
1319
1320 /* Called from output thread context */
1321 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1322     struct userdata *u;
1323
1324     pa_source_output_assert_ref(o);
1325     pa_source_output_assert_io_context(o);
1326     pa_assert_se(u = o->userdata);
1327
1328     pa_log_debug("Source output %d state %d", o->index, state);
1329 }
1330
1331 /* Called from IO thread context */
1332 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1333     struct userdata *u;
1334
1335     pa_sink_input_assert_ref(i);
1336     pa_assert_se(u = i->userdata);
1337
1338     pa_log_debug("Sink input %d state %d", i->index, state);
1339
1340     /* If we are added for the first time, ask for a rewinding so that
1341      * we are heard right-away. */
1342     if (PA_SINK_INPUT_IS_LINKED(state) &&
1343         i->thread_info.state == PA_SINK_INPUT_INIT) {
1344         pa_log_debug("Requesting rewind due to state change.");
1345         pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
1346     }
1347 }
1348
1349 /* Called from main thread */
1350 static void source_output_kill_cb(pa_source_output *o) {
1351     struct userdata *u;
1352
1353     pa_source_output_assert_ref(o);
1354     pa_assert_ctl_context();
1355     pa_assert_se(u = o->userdata);
1356
1357     u->dead = TRUE;
1358
1359     /* The order here matters! We first kill the source output, followed
1360      * by the source. That means the source callbacks must be protected
1361      * against an unconnected source output! */
1362     pa_source_output_unlink(u->source_output);
1363     pa_source_unlink(u->source);
1364
1365     pa_source_output_unref(u->source_output);
1366     u->source_output = NULL;
1367
1368     pa_source_unref(u->source);
1369     u->source = NULL;
1370
1371     pa_log_debug("Source output kill %d", o->index);
1372
1373     pa_module_unload_request(u->module, TRUE);
1374 }
1375
1376 /* Called from main context */
1377 static void sink_input_kill_cb(pa_sink_input *i) {
1378     struct userdata *u;
1379
1380     pa_sink_input_assert_ref(i);
1381     pa_assert_se(u = i->userdata);
1382
1383     u->dead = TRUE;
1384
1385     /* The order here matters! We first kill the sink input, followed
1386      * by the sink. That means the sink callbacks must be protected
1387      * against an unconnected sink input! */
1388     pa_sink_input_unlink(u->sink_input);
1389     pa_sink_unlink(u->sink);
1390
1391     pa_sink_input_unref(u->sink_input);
1392     u->sink_input = NULL;
1393
1394     pa_sink_unref(u->sink);
1395     u->sink = NULL;
1396
1397     pa_log_debug("Sink input kill %d", i->index);
1398
1399     pa_module_unload_request(u->module, TRUE);
1400 }
1401
1402 /* Called from main thread */
1403 static pa_bool_t source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1404     struct userdata *u;
1405
1406     pa_source_output_assert_ref(o);
1407     pa_assert_ctl_context();
1408     pa_assert_se(u = o->userdata);
1409
1410     if (u->dead || u->autoloaded)
1411         return FALSE;
1412
1413     return (u->source != dest) && (u->sink != dest->monitor_of);
1414 }
1415
1416 /* Called from main context */
1417 static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1418     struct userdata *u;
1419
1420     pa_sink_input_assert_ref(i);
1421     pa_assert_se(u = i->userdata);
1422
1423     if (u->dead || u->autoloaded)
1424         return FALSE;
1425
1426     return u->sink != dest;
1427 }
1428
1429 /* Called from main thread */
1430 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1431     struct userdata *u;
1432
1433     pa_source_output_assert_ref(o);
1434     pa_assert_ctl_context();
1435     pa_assert_se(u = o->userdata);
1436
1437     if (dest) {
1438         pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1439         pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1440     } else
1441         pa_source_set_asyncmsgq(u->source, NULL);
1442
1443     if (u->source_auto_desc && dest) {
1444         const char *y, *z;
1445         pa_proplist *pl;
1446
1447         pl = pa_proplist_new();
1448         y = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION);
1449         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1450         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1451                 y ? y : u->sink_input->sink->name);
1452
1453         pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1454         pa_proplist_free(pl);
1455     }
1456 }
1457
1458 /* Called from main context */
1459 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1460     struct userdata *u;
1461
1462     pa_sink_input_assert_ref(i);
1463     pa_assert_se(u = i->userdata);
1464
1465     if (dest) {
1466         pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1467         pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1468     } else
1469         pa_sink_set_asyncmsgq(u->sink, NULL);
1470
1471     if (u->sink_auto_desc && dest) {
1472         const char *y, *z;
1473         pa_proplist *pl;
1474
1475         pl = pa_proplist_new();
1476         y = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION);
1477         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1478         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1479                          y ? y : u->source_output->source->name);
1480
1481         pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1482         pa_proplist_free(pl);
1483     }
1484 }
1485
1486 /* Called from main context */
1487 static void sink_input_volume_changed_cb(pa_sink_input *i) {
1488     struct userdata *u;
1489
1490     pa_sink_input_assert_ref(i);
1491     pa_assert_se(u = i->userdata);
1492
1493     pa_sink_volume_changed(u->sink, &i->volume);
1494 }
1495
1496 /* Called from main context */
1497 static void sink_input_mute_changed_cb(pa_sink_input *i) {
1498     struct userdata *u;
1499
1500     pa_sink_input_assert_ref(i);
1501     pa_assert_se(u = i->userdata);
1502
1503     pa_sink_mute_changed(u->sink, i->muted);
1504 }
1505
1506 /* Called from main context */
1507 static int canceller_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1508     struct pa_echo_canceller_msg *msg;
1509     struct userdata *u;
1510
1511     pa_assert(o);
1512
1513     msg = PA_ECHO_CANCELLER_MSG(o);
1514     u = msg->userdata;
1515
1516     switch (code) {
1517         case ECHO_CANCELLER_MESSAGE_SET_VOLUME: {
1518             pa_cvolume *v = (pa_cvolume *) userdata;
1519
1520             if (u->use_volume_sharing)
1521                 pa_source_set_volume(u->source, v, TRUE, FALSE);
1522             else
1523                 pa_source_output_set_volume(u->source_output, v, FALSE, TRUE);
1524
1525             break;
1526         }
1527
1528         default:
1529             pa_assert_not_reached();
1530             break;
1531     }
1532
1533     return 0;
1534 }
1535
1536 /* Called by the canceller, so thread context */
1537 void pa_echo_canceller_get_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1538     *v = ec->msg->userdata->thread_info.current_volume;
1539 }
1540
1541 /* Called by the canceller, so thread context */
1542 void pa_echo_canceller_set_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1543     if (!pa_cvolume_equal(&ec->msg->userdata->thread_info.current_volume, v)) {
1544         pa_cvolume *vol = pa_xnewdup(pa_cvolume, v, 1);
1545
1546         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(ec->msg), ECHO_CANCELLER_MESSAGE_SET_VOLUME, vol, 0, NULL,
1547                 pa_xfree);
1548     }
1549 }
1550
1551 static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1552 #ifdef HAVE_SPEEX
1553     if (pa_streq(method, "speex"))
1554         return PA_ECHO_CANCELLER_SPEEX;
1555 #endif
1556 #ifdef HAVE_ADRIAN_EC
1557     if (pa_streq(method, "adrian"))
1558         return PA_ECHO_CANCELLER_ADRIAN;
1559 #endif
1560 #ifdef HAVE_WEBRTC
1561     if (pa_streq(method, "webrtc"))
1562         return PA_ECHO_CANCELLER_WEBRTC;
1563 #endif
1564     return PA_ECHO_CANCELLER_INVALID;
1565 }
1566
1567 /* Common initialisation bits between module-echo-cancel and the standalone test program */
1568 static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *source_ss, pa_channel_map *source_map) {
1569     pa_echo_canceller_method_t ec_method;
1570
1571     if (pa_modargs_get_sample_spec_and_channel_map(ma, source_ss, source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1572         pa_log("Invalid sample format specification or channel map");
1573         goto fail;
1574     }
1575
1576     u->ec = pa_xnew0(pa_echo_canceller, 1);
1577     if (!u->ec) {
1578         pa_log("Failed to alloc echo canceller");
1579         goto fail;
1580     }
1581
1582     if ((ec_method = get_ec_method_from_string(pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER))) < 0) {
1583         pa_log("Invalid echo canceller implementation");
1584         goto fail;
1585     }
1586
1587     u->ec->init = ec_table[ec_method].init;
1588     u->ec->play = ec_table[ec_method].play;
1589     u->ec->record = ec_table[ec_method].record;
1590     u->ec->set_drift = ec_table[ec_method].set_drift;
1591     u->ec->run = ec_table[ec_method].run;
1592     u->ec->done = ec_table[ec_method].done;
1593
1594     return 0;
1595
1596 fail:
1597     return -1;
1598 }
1599
1600
1601 int pa__init(pa_module*m) {
1602     struct userdata *u;
1603     pa_sample_spec source_ss, sink_ss;
1604     pa_channel_map source_map, sink_map;
1605     pa_modargs *ma;
1606     pa_source *source_master=NULL;
1607     pa_sink *sink_master=NULL;
1608     pa_source_output_new_data source_output_data;
1609     pa_sink_input_new_data sink_input_data;
1610     pa_source_new_data source_data;
1611     pa_sink_new_data sink_data;
1612     pa_memchunk silence;
1613     uint32_t temp;
1614
1615     pa_assert(m);
1616
1617     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1618         pa_log("Failed to parse module arguments.");
1619         goto fail;
1620     }
1621
1622     if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1623         pa_log("Master source not found");
1624         goto fail;
1625     }
1626     pa_assert(source_master);
1627
1628     if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1629         pa_log("Master sink not found");
1630         goto fail;
1631     }
1632     pa_assert(sink_master);
1633
1634     if (source_master->monitor_of == sink_master) {
1635         pa_log("Can't cancel echo between a sink and its monitor");
1636         goto fail;
1637     }
1638
1639     source_ss = source_master->sample_spec;
1640     source_ss.rate = DEFAULT_RATE;
1641     source_ss.channels = DEFAULT_CHANNELS;
1642     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1643
1644     sink_ss = sink_master->sample_spec;
1645     sink_map = sink_master->channel_map;
1646
1647     u = pa_xnew0(struct userdata, 1);
1648     if (!u) {
1649         pa_log("Failed to alloc userdata");
1650         goto fail;
1651     }
1652     u->core = m->core;
1653     u->module = m;
1654     m->userdata = u;
1655     u->dead = FALSE;
1656
1657     u->use_volume_sharing = TRUE;
1658     if (pa_modargs_get_value_boolean(ma, "use_volume_sharing", &u->use_volume_sharing) < 0) {
1659         pa_log("use_volume_sharing= expects a boolean argument");
1660         goto fail;
1661     }
1662
1663     temp = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1664     if (pa_modargs_get_value_u32(ma, "adjust_time", &temp) < 0) {
1665         pa_log("Failed to parse adjust_time value");
1666         goto fail;
1667     }
1668
1669     if (temp != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1670         u->adjust_time = temp * PA_USEC_PER_SEC;
1671     else
1672         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1673
1674     temp = DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC;
1675     if (pa_modargs_get_value_u32(ma, "adjust_threshold", &temp) < 0) {
1676         pa_log("Failed to parse adjust_threshold value");
1677         goto fail;
1678     }
1679
1680     if (temp != DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC)
1681         u->adjust_threshold = temp * PA_USEC_PER_MSEC;
1682     else
1683         u->adjust_threshold = DEFAULT_ADJUST_TOLERANCE;
1684
1685     u->save_aec = DEFAULT_SAVE_AEC;
1686     if (pa_modargs_get_value_boolean(ma, "save_aec", &u->save_aec) < 0) {
1687         pa_log("Failed to parse save_aec value");
1688         goto fail;
1689     }
1690
1691     u->autoloaded = DEFAULT_AUTOLOADED;
1692     if (pa_modargs_get_value_boolean(ma, "autoloaded", &u->autoloaded) < 0) {
1693         pa_log("Failed to parse autoloaded value");
1694         goto fail;
1695     }
1696
1697     if (init_common(ma, u, &source_ss, &source_map))
1698         goto fail;
1699
1700     u->asyncmsgq = pa_asyncmsgq_new(0);
1701     u->need_realign = TRUE;
1702
1703     if (u->ec->init) {
1704         if (!u->ec->init(u->core, u->ec, &source_ss, &source_map, &sink_ss, &sink_map, &u->blocksize, pa_modargs_get_value(ma, "aec_args", NULL))) {
1705             pa_log("Failed to init AEC engine");
1706             goto fail;
1707         }
1708     }
1709
1710     if (u->ec->params.drift_compensation)
1711         pa_assert(u->ec->set_drift);
1712
1713     /* Create source */
1714     pa_source_new_data_init(&source_data);
1715     source_data.driver = __FILE__;
1716     source_data.module = m;
1717     if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1718         source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1719     pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1720     pa_source_new_data_set_channel_map(&source_data, &source_map);
1721     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1722     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1723     if (!u->autoloaded)
1724         pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1725
1726     if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1727         pa_log("Invalid properties");
1728         pa_source_new_data_done(&source_data);
1729         goto fail;
1730     }
1731
1732     if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1733         const char *y, *z;
1734
1735         y = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1736         z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1737         pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1738                 z ? z : source_master->name, y ? y : sink_master->name);
1739     }
1740
1741     u->source = pa_source_new(m->core, &source_data, (source_master->flags & (PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY))
1742                                                      | (u->use_volume_sharing ? PA_SOURCE_SHARE_VOLUME_WITH_MASTER : 0));
1743     pa_source_new_data_done(&source_data);
1744
1745     if (!u->source) {
1746         pa_log("Failed to create source.");
1747         goto fail;
1748     }
1749
1750     u->source->parent.process_msg = source_process_msg_cb;
1751     u->source->set_state = source_set_state_cb;
1752     u->source->update_requested_latency = source_update_requested_latency_cb;
1753     pa_source_set_get_mute_callback(u->source, source_get_mute_cb);
1754     pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
1755     if (!u->use_volume_sharing) {
1756         pa_source_set_get_volume_callback(u->source, source_get_volume_cb);
1757         pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
1758         pa_source_enable_decibel_volume(u->source, TRUE);
1759     }
1760     u->source->userdata = u;
1761
1762     pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1763
1764     /* Create sink */
1765     pa_sink_new_data_init(&sink_data);
1766     sink_data.driver = __FILE__;
1767     sink_data.module = m;
1768     if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1769         sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1770     pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1771     pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1772     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1773     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1774     if (!u->autoloaded)
1775         pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1776
1777     if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1778         pa_log("Invalid properties");
1779         pa_sink_new_data_done(&sink_data);
1780         goto fail;
1781     }
1782
1783     if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1784         const char *y, *z;
1785
1786         y = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1787         z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1788         pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1789                 z ? z : sink_master->name, y ? y : source_master->name);
1790     }
1791
1792     u->sink = pa_sink_new(m->core, &sink_data, (sink_master->flags & (PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY))
1793                                                | (u->use_volume_sharing ? PA_SINK_SHARE_VOLUME_WITH_MASTER : 0));
1794     pa_sink_new_data_done(&sink_data);
1795
1796     if (!u->sink) {
1797         pa_log("Failed to create sink.");
1798         goto fail;
1799     }
1800
1801     u->sink->parent.process_msg = sink_process_msg_cb;
1802     u->sink->set_state = sink_set_state_cb;
1803     u->sink->update_requested_latency = sink_update_requested_latency_cb;
1804     u->sink->request_rewind = sink_request_rewind_cb;
1805     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
1806     if (!u->use_volume_sharing) {
1807         pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
1808         pa_sink_enable_decibel_volume(u->sink, TRUE);
1809     }
1810     u->sink->userdata = u;
1811
1812     pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1813
1814     /* Create source output */
1815     pa_source_output_new_data_init(&source_output_data);
1816     source_output_data.driver = __FILE__;
1817     source_output_data.module = m;
1818     pa_source_output_new_data_set_source(&source_output_data, source_master, FALSE);
1819     source_output_data.destination_source = u->source;
1820     /* FIXME
1821        source_output_data.flags = PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; */
1822
1823     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1824     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1825     pa_source_output_new_data_set_sample_spec(&source_output_data, &source_ss);
1826     pa_source_output_new_data_set_channel_map(&source_output_data, &source_map);
1827
1828     pa_source_output_new(&u->source_output, m->core, &source_output_data);
1829     pa_source_output_new_data_done(&source_output_data);
1830
1831     if (!u->source_output)
1832         goto fail;
1833
1834     u->source_output->parent.process_msg = source_output_process_msg_cb;
1835     u->source_output->push = source_output_push_cb;
1836     u->source_output->process_rewind = source_output_process_rewind_cb;
1837     u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1838     u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1839     u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1840     u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1841     u->source_output->kill = source_output_kill_cb;
1842     u->source_output->attach = source_output_attach_cb;
1843     u->source_output->detach = source_output_detach_cb;
1844     u->source_output->state_change = source_output_state_change_cb;
1845     u->source_output->may_move_to = source_output_may_move_to_cb;
1846     u->source_output->moving = source_output_moving_cb;
1847     u->source_output->userdata = u;
1848
1849     u->source->output_from_master = u->source_output;
1850
1851     /* Create sink input */
1852     pa_sink_input_new_data_init(&sink_input_data);
1853     sink_input_data.driver = __FILE__;
1854     sink_input_data.module = m;
1855     pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, FALSE);
1856     sink_input_data.origin_sink = u->sink;
1857     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
1858     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1859     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
1860     pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
1861     sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
1862
1863     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1864     pa_sink_input_new_data_done(&sink_input_data);
1865
1866     if (!u->sink_input)
1867         goto fail;
1868
1869     u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1870     u->sink_input->pop = sink_input_pop_cb;
1871     u->sink_input->process_rewind = sink_input_process_rewind_cb;
1872     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1873     u->sink_input->update_max_request = sink_input_update_max_request_cb;
1874     u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
1875     u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1876     u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
1877     u->sink_input->kill = sink_input_kill_cb;
1878     u->sink_input->attach = sink_input_attach_cb;
1879     u->sink_input->detach = sink_input_detach_cb;
1880     u->sink_input->state_change = sink_input_state_change_cb;
1881     u->sink_input->may_move_to = sink_input_may_move_to_cb;
1882     u->sink_input->moving = sink_input_moving_cb;
1883     if (!u->use_volume_sharing)
1884         u->sink_input->volume_changed = sink_input_volume_changed_cb;
1885     u->sink_input->mute_changed = sink_input_mute_changed_cb;
1886     u->sink_input->userdata = u;
1887
1888     u->sink->input_to_master = u->sink_input;
1889
1890     pa_sink_input_get_silence(u->sink_input, &silence);
1891
1892     u->source_memblockq = pa_memblockq_new("module-echo-cancel source_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1893         &source_ss, 1, 1, 0, &silence);
1894     u->sink_memblockq = pa_memblockq_new("module-echo-cancel sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1895         &sink_ss, 1, 1, 0, &silence);
1896
1897     pa_memblock_unref(silence.memblock);
1898
1899     if (!u->source_memblockq || !u->sink_memblockq) {
1900         pa_log("Failed to create memblockq.");
1901         goto fail;
1902     }
1903
1904     if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
1905         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1906     else if (u->ec->params.drift_compensation) {
1907         pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
1908         u->adjust_time = 0;
1909         /* Perform resync just once to give the canceller a leg up */
1910         pa_atomic_store(&u->request_resync, 1);
1911     }
1912
1913     if (u->save_aec) {
1914         pa_log("Creating AEC files in /tmp");
1915         u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
1916         if (u->captured_file == NULL)
1917             perror ("fopen failed");
1918         u->played_file = fopen("/tmp/aec_play.sw", "wb");
1919         if (u->played_file == NULL)
1920             perror ("fopen failed");
1921         u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
1922         if (u->canceled_file == NULL)
1923             perror ("fopen failed");
1924         if (u->ec->params.drift_compensation) {
1925             u->drift_file = fopen("/tmp/aec_drift.txt", "w");
1926             if (u->drift_file == NULL)
1927                 perror ("fopen failed");
1928         }
1929     }
1930
1931     u->ec->msg = pa_msgobject_new(pa_echo_canceller_msg);
1932     u->ec->msg->parent.process_msg = canceller_process_msg_cb;
1933     u->ec->msg->userdata = u;
1934
1935     u->thread_info.current_volume = u->source->reference_volume;
1936
1937     pa_sink_put(u->sink);
1938     pa_source_put(u->source);
1939
1940     pa_sink_input_put(u->sink_input);
1941     pa_source_output_put(u->source_output);
1942     pa_modargs_free(ma);
1943
1944     return 0;
1945
1946 fail:
1947     if (ma)
1948         pa_modargs_free(ma);
1949
1950     pa__done(m);
1951
1952     return -1;
1953 }
1954
1955 int pa__get_n_used(pa_module *m) {
1956     struct userdata *u;
1957
1958     pa_assert(m);
1959     pa_assert_se(u = m->userdata);
1960
1961     return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
1962 }
1963
1964 void pa__done(pa_module*m) {
1965     struct userdata *u;
1966
1967     pa_assert(m);
1968
1969     if (!(u = m->userdata))
1970         return;
1971
1972     u->dead = TRUE;
1973
1974     /* See comments in source_output_kill_cb() above regarding
1975      * destruction order! */
1976
1977     if (u->time_event)
1978         u->core->mainloop->time_free(u->time_event);
1979
1980     if (u->source_output)
1981         pa_source_output_unlink(u->source_output);
1982     if (u->sink_input)
1983         pa_sink_input_unlink(u->sink_input);
1984
1985     if (u->source)
1986         pa_source_unlink(u->source);
1987     if (u->sink)
1988         pa_sink_unlink(u->sink);
1989
1990     if (u->source_output)
1991         pa_source_output_unref(u->source_output);
1992     if (u->sink_input)
1993         pa_sink_input_unref(u->sink_input);
1994
1995     if (u->source)
1996         pa_source_unref(u->source);
1997     if (u->sink)
1998         pa_sink_unref(u->sink);
1999
2000     if (u->source_memblockq)
2001         pa_memblockq_free(u->source_memblockq);
2002     if (u->sink_memblockq)
2003         pa_memblockq_free(u->sink_memblockq);
2004
2005     if (u->ec) {
2006         if (u->ec->done)
2007             u->ec->done(u->ec);
2008
2009         pa_xfree(u->ec);
2010     }
2011
2012     if (u->asyncmsgq)
2013         pa_asyncmsgq_unref(u->asyncmsgq);
2014
2015     if (u->save_aec) {
2016         if (u->played_file)
2017             fclose(u->played_file);
2018         if (u->captured_file)
2019             fclose(u->captured_file);
2020         if (u->canceled_file)
2021             fclose(u->canceled_file);
2022         if (u->drift_file)
2023             fclose(u->drift_file);
2024     }
2025
2026     pa_xfree(u);
2027 }
2028
2029 #ifdef ECHO_CANCEL_TEST
2030 /*
2031  * Stand-alone test program for running in the canceller on pre-recorded files.
2032  */
2033 int main(int argc, char* argv[]) {
2034     struct userdata u;
2035     pa_sample_spec source_ss, sink_ss;
2036     pa_channel_map source_map, sink_map;
2037     pa_modargs *ma = NULL;
2038     uint8_t *rdata = NULL, *pdata = NULL, *cdata = NULL;
2039     int unused PA_GCC_UNUSED;
2040     int ret = 0, i;
2041     char c;
2042     float drift;
2043
2044     pa_memzero(&u, sizeof(u));
2045
2046     if (argc < 4 || argc > 7) {
2047         goto usage;
2048     }
2049
2050     u.captured_file = fopen(argv[2], "r");
2051     if (u.captured_file == NULL) {
2052         perror ("fopen failed");
2053         goto fail;
2054     }
2055     u.played_file = fopen(argv[1], "r");
2056     if (u.played_file == NULL) {
2057         perror ("fopen failed");
2058         goto fail;
2059     }
2060     u.canceled_file = fopen(argv[3], "wb");
2061     if (u.canceled_file == NULL) {
2062         perror ("fopen failed");
2063         goto fail;
2064     }
2065
2066     u.core = pa_xnew0(pa_core, 1);
2067     u.core->cpu_info.cpu_type = PA_CPU_X86;
2068     u.core->cpu_info.flags.x86 |= PA_CPU_X86_SSE;
2069
2070     if (!(ma = pa_modargs_new(argc > 4 ? argv[4] : NULL, valid_modargs))) {
2071         pa_log("Failed to parse module arguments.");
2072         goto fail;
2073     }
2074
2075     source_ss.format = PA_SAMPLE_S16LE;
2076     source_ss.rate = DEFAULT_RATE;
2077     source_ss.channels = DEFAULT_CHANNELS;
2078     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2079
2080     init_common(ma, &u, &source_ss, &source_map);
2081
2082     if (!u.ec->init(u.core, u.ec, &source_ss, &source_map, &sink_ss, &sink_map, &u.blocksize,
2083                      (argc > 4) ? argv[5] : NULL )) {
2084         pa_log("Failed to init AEC engine");
2085         goto fail;
2086     }
2087
2088     if (u.ec->params.drift_compensation) {
2089         if (argc < 7) {
2090             pa_log("Drift compensation enabled but drift file not specified");
2091             goto fail;
2092         }
2093
2094         u.drift_file = fopen(argv[6], "r");
2095
2096         if (u.drift_file == NULL) {
2097             perror ("fopen failed");
2098             goto fail;
2099         }
2100     }
2101
2102     rdata = pa_xmalloc(u.blocksize);
2103     pdata = pa_xmalloc(u.blocksize);
2104     cdata = pa_xmalloc(u.blocksize);
2105
2106     if (!u.ec->params.drift_compensation) {
2107         while (fread(rdata, u.blocksize, 1, u.captured_file) > 0) {
2108             if (fread(pdata, u.blocksize, 1, u.played_file) == 0) {
2109                 perror("Played file ended before captured file");
2110                 goto fail;
2111             }
2112
2113             u.ec->run(u.ec, rdata, pdata, cdata);
2114
2115             unused = fwrite(cdata, u.blocksize, 1, u.canceled_file);
2116         }
2117     } else {
2118         while (fscanf(u.drift_file, "%c", &c) > 0) {
2119             switch (c) {
2120                 case 'd':
2121                     if (!fscanf(u.drift_file, "%a", &drift)) {
2122                         perror("Drift file incomplete");
2123                         goto fail;
2124                     }
2125
2126                     u.ec->set_drift(u.ec, drift);
2127
2128                     break;
2129
2130                 case 'c':
2131                     if (!fscanf(u.drift_file, "%d", &i)) {
2132                         perror("Drift file incomplete");
2133                         goto fail;
2134                     }
2135
2136                     if (fread(rdata, i, 1, u.captured_file) <= 0) {
2137                         perror("Captured file ended prematurely");
2138                         goto fail;
2139                     }
2140
2141                     u.ec->record(u.ec, rdata, cdata);
2142
2143                     unused = fwrite(cdata, i, 1, u.canceled_file);
2144
2145                     break;
2146
2147                 case 'p':
2148                     if (!fscanf(u.drift_file, "%d", &i)) {
2149                         perror("Drift file incomplete");
2150                         goto fail;
2151                     }
2152
2153                     if (fread(pdata, i, 1, u.played_file) <= 0) {
2154                         perror("Played file ended prematurely");
2155                         goto fail;
2156                     }
2157
2158                     u.ec->play(u.ec, pdata);
2159
2160                     break;
2161             }
2162         }
2163
2164         if (fread(rdata, i, 1, u.captured_file) > 0)
2165             pa_log("All capture data was not consumed");
2166         if (fread(pdata, i, 1, u.played_file) > 0)
2167             pa_log("All playback data was not consumed");
2168     }
2169
2170     u.ec->done(u.ec);
2171
2172     fclose(u.captured_file);
2173     fclose(u.played_file);
2174     fclose(u.canceled_file);
2175     if (u.drift_file)
2176         fclose(u.drift_file);
2177
2178 out:
2179     pa_xfree(rdata);
2180     pa_xfree(pdata);
2181     pa_xfree(cdata);
2182
2183     pa_xfree(u.ec);
2184     pa_xfree(u.core);
2185
2186     if (ma)
2187         pa_modargs_free(ma);
2188
2189     return ret;
2190
2191 usage:
2192     pa_log("Usage: %s play_file rec_file out_file [module args] [aec_args] [drift_file]", argv[0]);
2193
2194 fail:
2195     ret = -1;
2196     goto out;
2197 }
2198 #endif /* ECHO_CANCEL_TEST */