11ad1de2974c80fa10b960bde2c12a2e357247f3
[platform/upstream/pulseaudio.git] / src / modules / echo-cancel / module-echo-cancel.c
1 /***
2     This file is part of PulseAudio.
3
4     Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6     Based on module-virtual-sink.c
7              module-virtual-source.c
8              module-loopback.c
9
10         Copyright 2010 Intel Corporation
11         Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13     PulseAudio is free software; you can redistribute it and/or modify
14     it under the terms of the GNU Lesser General Public License as published
15     by the Free Software Foundation; either version 2.1 of the License,
16     or (at your option) any later version.
17
18     PulseAudio is distributed in the hope that it will be useful, but
19     WITHOUT ANY WARRANTY; without even the implied warranty of
20     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21     General Public License for more details.
22
23     You should have received a copy of the GNU Lesser General Public License
24     along with PulseAudio; if not, write to the Free Software
25     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26     USA.
27 ***/
28
29 #ifdef HAVE_CONFIG_H
30 #include <config.h>
31 #endif
32
33 #include <stdio.h>
34 #include <math.h>
35
36 #include "echo-cancel.h"
37
38 #include <pulse/xmalloc.h>
39 #include <pulse/timeval.h>
40 #include <pulse/rtclock.h>
41
42 #include <pulsecore/i18n.h>
43 #include <pulsecore/atomic.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/namereg.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-rtclock.h>
49 #include <pulsecore/core-util.h>
50 #include <pulsecore/modargs.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/rtpoll.h>
53 #include <pulsecore/sample-util.h>
54 #include <pulsecore/ltdl-helper.h>
55
56 #include "module-echo-cancel-symdef.h"
57
58 PA_MODULE_AUTHOR("Wim Taymans");
59 PA_MODULE_DESCRIPTION("Echo Cancellation");
60 PA_MODULE_VERSION(PACKAGE_VERSION);
61 PA_MODULE_LOAD_ONCE(FALSE);
62 PA_MODULE_USAGE(
63         _("source_name=<name for the source> "
64           "source_properties=<properties for the source> "
65           "source_master=<name of source to filter> "
66           "sink_name=<name for the sink> "
67           "sink_properties=<properties for the sink> "
68           "sink_master=<name of sink to filter> "
69           "adjust_time=<how often to readjust rates in s> "
70           "adjust_threshold=<how much drift to readjust after in ms> "
71           "format=<sample format> "
72           "rate=<sample rate> "
73           "channels=<number of channels> "
74           "channel_map=<channel map> "
75           "aec_method=<implementation to use> "
76           "aec_args=<parameters for the AEC engine> "
77           "save_aec=<save AEC data in /tmp> "
78           "autoloaded=<set if this module is being loaded automatically> "
79           "use_volume_sharing=<yes or no> "
80         ));
81
82 /* NOTE: Make sure the enum and ec_table are maintained in the correct order */
83 typedef enum {
84     PA_ECHO_CANCELLER_INVALID = -1,
85     PA_ECHO_CANCELLER_NULL,
86 #ifdef HAVE_SPEEX
87     PA_ECHO_CANCELLER_SPEEX,
88 #endif
89 #ifdef HAVE_ADRIAN_EC
90     PA_ECHO_CANCELLER_ADRIAN,
91 #endif
92 #ifdef HAVE_WEBRTC
93     PA_ECHO_CANCELLER_WEBRTC,
94 #endif
95 } pa_echo_canceller_method_t;
96
97 #ifdef HAVE_WEBRTC
98 #define DEFAULT_ECHO_CANCELLER "webrtc"
99 #else
100 #define DEFAULT_ECHO_CANCELLER "speex"
101 #endif
102
103 static const pa_echo_canceller ec_table[] = {
104     {
105         /* Null, Dummy echo canceller (just copies data) */
106         .init                   = pa_null_ec_init,
107         .run                    = pa_null_ec_run,
108         .done                   = pa_null_ec_done,
109     },
110 #ifdef HAVE_SPEEX
111     {
112         /* Speex */
113         .init                   = pa_speex_ec_init,
114         .run                    = pa_speex_ec_run,
115         .done                   = pa_speex_ec_done,
116     },
117 #endif
118 #ifdef HAVE_ADRIAN_EC
119     {
120         /* Adrian Andre's NLMS implementation */
121         .init                   = pa_adrian_ec_init,
122         .run                    = pa_adrian_ec_run,
123         .done                   = pa_adrian_ec_done,
124     },
125 #endif
126 #ifdef HAVE_WEBRTC
127     {
128         /* WebRTC's audio processing engine */
129         .init                   = pa_webrtc_ec_init,
130         .play                   = pa_webrtc_ec_play,
131         .record                 = pa_webrtc_ec_record,
132         .set_drift              = pa_webrtc_ec_set_drift,
133         .run                    = pa_webrtc_ec_run,
134         .done                   = pa_webrtc_ec_done,
135     },
136 #endif
137 };
138
139 #define DEFAULT_RATE 32000
140 #define DEFAULT_CHANNELS 1
141 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
142 #define DEFAULT_ADJUST_TOLERANCE (5*PA_USEC_PER_MSEC)
143 #define DEFAULT_SAVE_AEC FALSE
144 #define DEFAULT_AUTOLOADED FALSE
145
146 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
147
148 /* Can only be used in main context */
149 #define IS_ACTIVE(u) ((pa_source_get_state((u)->source) == PA_SOURCE_RUNNING) && \
150                       (pa_sink_get_state((u)->sink) == PA_SINK_RUNNING))
151
152 /* This module creates a new (virtual) source and sink.
153  *
154  * The data sent to the new sink is kept in a memblockq before being
155  * forwarded to the real sink_master.
156  *
157  * Data read from source_master is matched against the saved sink data and
158  * echo canceled data is then pushed onto the new source.
159  *
160  * Both source and sink masters have their own threads to push/pull data
161  * respectively. We however perform all our actions in the source IO thread.
162  * To do this we send all played samples to the source IO thread where they
163  * are then pushed into the memblockq.
164  *
165  * Alignment is performed in two steps:
166  *
167  * 1) when something happens that requires quick adjustment of the alignment of
168  *    capture and playback samples, we perform a resync. This adjusts the
169  *    position in the playback memblock to the requested sample. Quick
170  *    adjustments include moving the playback samples before the capture
171  *    samples (because else the echo canceler does not work) or when the
172  *    playback pointer drifts too far away.
173  *
174  * 2) periodically check the difference between capture and playback. We use a
175  *    low and high watermark for adjusting the alignment. Playback should always
176  *    be before capture and the difference should not be bigger than one frame
177  *    size. We would ideally like to resample the sink_input but most driver
178  *    don't give enough accuracy to be able to do that right now.
179  */
180
181 struct userdata;
182
183 struct pa_echo_canceller_msg {
184     pa_msgobject parent;
185     struct userdata *userdata;
186 };
187
188 PA_DEFINE_PRIVATE_CLASS(pa_echo_canceller_msg, pa_msgobject);
189 #define PA_ECHO_CANCELLER_MSG(o) (pa_echo_canceller_msg_cast(o))
190
191 struct snapshot {
192     pa_usec_t sink_now;
193     pa_usec_t sink_latency;
194     size_t sink_delay;
195     int64_t send_counter;
196
197     pa_usec_t source_now;
198     pa_usec_t source_latency;
199     size_t source_delay;
200     int64_t recv_counter;
201     size_t rlen;
202     size_t plen;
203 };
204
205 struct userdata {
206     pa_core *core;
207     pa_module *module;
208
209     pa_bool_t autoloaded;
210     pa_bool_t dead;
211     pa_bool_t save_aec;
212
213     pa_echo_canceller *ec;
214     uint32_t source_blocksize;
215     uint32_t sink_blocksize;
216
217     pa_bool_t need_realign;
218
219     /* to wakeup the source I/O thread */
220     pa_asyncmsgq *asyncmsgq;
221     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
222
223     pa_source *source;
224     pa_bool_t source_auto_desc;
225     pa_source_output *source_output;
226     pa_memblockq *source_memblockq; /* echo canceler needs fixed sized chunks */
227     size_t source_skip;
228
229     pa_sink *sink;
230     pa_bool_t sink_auto_desc;
231     pa_sink_input *sink_input;
232     pa_memblockq *sink_memblockq;
233     int64_t send_counter;          /* updated in sink IO thread */
234     int64_t recv_counter;
235     size_t sink_skip;
236
237     /* Bytes left over from previous iteration */
238     size_t sink_rem;
239     size_t source_rem;
240
241     pa_atomic_t request_resync;
242
243     pa_time_event *time_event;
244     pa_usec_t adjust_time;
245     int adjust_threshold;
246
247     FILE *captured_file;
248     FILE *played_file;
249     FILE *canceled_file;
250     FILE *drift_file;
251
252     pa_bool_t use_volume_sharing;
253
254     struct {
255         pa_cvolume current_volume;
256     } thread_info;
257 };
258
259 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
260
261 static const char* const valid_modargs[] = {
262     "source_name",
263     "source_properties",
264     "source_master",
265     "sink_name",
266     "sink_properties",
267     "sink_master",
268     "adjust_time",
269     "adjust_threshold",
270     "format",
271     "rate",
272     "channels",
273     "channel_map",
274     "aec_method",
275     "aec_args",
276     "save_aec",
277     "autoloaded",
278     "use_volume_sharing",
279     NULL
280 };
281
282 enum {
283     SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
284     SOURCE_OUTPUT_MESSAGE_REWIND,
285     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
286     SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
287 };
288
289 enum {
290     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
291 };
292
293 enum {
294     ECHO_CANCELLER_MESSAGE_SET_VOLUME,
295 };
296
297 static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
298     int64_t diff_time, buffer_latency;
299     pa_usec_t plen, rlen, source_delay, sink_delay, recv_counter, send_counter;
300
301     /* get latency difference between playback and record */
302     plen = pa_bytes_to_usec(snapshot->plen, &u->sink_input->sample_spec);
303     rlen = pa_bytes_to_usec(snapshot->rlen, &u->source_output->sample_spec);
304     if (plen > rlen)
305         buffer_latency = plen - rlen;
306     else
307         buffer_latency = 0;
308
309     source_delay = pa_bytes_to_usec(snapshot->source_delay, &u->source_output->sample_spec);
310     sink_delay = pa_bytes_to_usec(snapshot->sink_delay, &u->sink_input->sample_spec);
311     buffer_latency += source_delay + sink_delay;
312
313     /* add the latency difference due to samples not yet transferred */
314     send_counter = pa_bytes_to_usec(snapshot->send_counter, &u->sink_input->sample_spec);
315     recv_counter = pa_bytes_to_usec(snapshot->recv_counter, &u->source_output->sample_spec);
316     if (recv_counter <= send_counter)
317         buffer_latency += (int64_t) (send_counter - recv_counter);
318     else
319         buffer_latency += PA_CLIP_SUB(buffer_latency, (int64_t) (recv_counter - send_counter));
320
321     /* capture and playback are perfectly aligned when diff_time is 0 */
322     diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
323           (snapshot->source_now - snapshot->source_latency);
324
325     pa_log_debug("Diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
326         (long long) snapshot->sink_latency,
327         (long long) buffer_latency, (long long) snapshot->source_latency,
328         (long long) source_delay, (long long) sink_delay,
329         (long long) (send_counter - recv_counter),
330         (long long) (snapshot->sink_now - snapshot->source_now));
331
332     return diff_time;
333 }
334
335 /* Called from main context */
336 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
337     struct userdata *u = userdata;
338     uint32_t old_rate, base_rate, new_rate;
339     int64_t diff_time;
340     /*size_t fs*/
341     struct snapshot latency_snapshot;
342
343     pa_assert(u);
344     pa_assert(a);
345     pa_assert(u->time_event == e);
346     pa_assert_ctl_context();
347
348     if (!IS_ACTIVE(u))
349         return;
350
351     /* update our snapshots */
352     pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
353     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
354
355     /* calculate drift between capture and playback */
356     diff_time = calc_diff(u, &latency_snapshot);
357
358     /*fs = pa_frame_size(&u->source_output->sample_spec);*/
359     old_rate = u->sink_input->sample_spec.rate;
360     base_rate = u->source_output->sample_spec.rate;
361
362     if (diff_time < 0) {
363         /* recording before playback, we need to adjust quickly. The echo
364          * canceler does not work in this case. */
365         pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
366             NULL, diff_time, NULL, NULL);
367         /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
368         new_rate = base_rate;
369     }
370     else {
371         if (diff_time > u->adjust_threshold) {
372             /* diff too big, quickly adjust */
373             pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
374                 NULL, diff_time, NULL, NULL);
375         }
376
377         /* recording behind playback, we need to slowly adjust the rate to match */
378         /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
379
380         /* assume equal samplerates for now */
381         new_rate = base_rate;
382     }
383
384     /* make sure we don't make too big adjustments because that sounds horrible */
385     if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
386         new_rate = base_rate;
387
388     if (new_rate != old_rate) {
389         pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
390
391         pa_sink_input_set_rate(u->sink_input, new_rate);
392     }
393
394     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
395 }
396
397 /* Called from source I/O thread context */
398 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
399     struct userdata *u = PA_SOURCE(o)->userdata;
400
401     switch (code) {
402
403         case PA_SOURCE_MESSAGE_GET_LATENCY:
404
405             /* The source is _put() before the source output is, so let's
406              * make sure we don't access it in that time. Also, the
407              * source output is first shut down, the source second. */
408             if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
409                 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
410                 *((pa_usec_t*) data) = 0;
411                 return 0;
412             }
413
414             *((pa_usec_t*) data) =
415
416                 /* Get the latency of the master source */
417                 pa_source_get_latency_within_thread(u->source_output->source) +
418                 /* Add the latency internal to our source output on top */
419                 pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
420                 /* and the buffering we do on the source */
421                 pa_bytes_to_usec(u->source_blocksize, &u->source_output->source->sample_spec);
422
423             return 0;
424
425         case PA_SOURCE_MESSAGE_SET_VOLUME_SYNCED:
426             u->thread_info.current_volume = u->source->reference_volume;
427             break;
428     }
429
430     return pa_source_process_msg(o, code, data, offset, chunk);
431 }
432
433 /* Called from sink I/O thread context */
434 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
435     struct userdata *u = PA_SINK(o)->userdata;
436
437     switch (code) {
438
439         case PA_SINK_MESSAGE_GET_LATENCY:
440
441             /* The sink is _put() before the sink input is, so let's
442              * make sure we don't access it in that time. Also, the
443              * sink input is first shut down, the sink second. */
444             if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
445                 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
446                 *((pa_usec_t*) data) = 0;
447                 return 0;
448             }
449
450             *((pa_usec_t*) data) =
451
452                 /* Get the latency of the master sink */
453                 pa_sink_get_latency_within_thread(u->sink_input->sink) +
454
455                 /* Add the latency internal to our sink input on top */
456                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
457
458             return 0;
459     }
460
461     return pa_sink_process_msg(o, code, data, offset, chunk);
462 }
463
464
465 /* Called from main context */
466 static int source_set_state_cb(pa_source *s, pa_source_state_t state) {
467     struct userdata *u;
468
469     pa_source_assert_ref(s);
470     pa_assert_se(u = s->userdata);
471
472     if (!PA_SOURCE_IS_LINKED(state) ||
473         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
474         return 0;
475
476     if (state == PA_SOURCE_RUNNING) {
477         /* restart timer when both sink and source are active */
478         if (IS_ACTIVE(u) && u->adjust_time)
479             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
480
481         pa_atomic_store(&u->request_resync, 1);
482         pa_source_output_cork(u->source_output, FALSE);
483     } else if (state == PA_SOURCE_SUSPENDED) {
484         pa_source_output_cork(u->source_output, TRUE);
485     }
486
487     return 0;
488 }
489
490 /* Called from main context */
491 static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state) {
492     struct userdata *u;
493
494     pa_sink_assert_ref(s);
495     pa_assert_se(u = s->userdata);
496
497     if (!PA_SINK_IS_LINKED(state) ||
498         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
499         return 0;
500
501     if (state == PA_SINK_RUNNING) {
502         /* restart timer when both sink and source are active */
503         if (IS_ACTIVE(u) && u->adjust_time)
504             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
505
506         pa_atomic_store(&u->request_resync, 1);
507         pa_sink_input_cork(u->sink_input, FALSE);
508     } else if (state == PA_SINK_SUSPENDED) {
509         pa_sink_input_cork(u->sink_input, TRUE);
510     }
511
512     return 0;
513 }
514
515 /* Called from source I/O thread context */
516 static void source_update_requested_latency_cb(pa_source *s) {
517     struct userdata *u;
518
519     pa_source_assert_ref(s);
520     pa_assert_se(u = s->userdata);
521
522     if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
523         !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
524         return;
525
526     pa_log_debug("Source update requested latency");
527
528     /* Just hand this one over to the master source */
529     pa_source_output_set_requested_latency_within_thread(
530             u->source_output,
531             pa_source_get_requested_latency_within_thread(s));
532 }
533
534 /* Called from sink I/O thread context */
535 static void sink_update_requested_latency_cb(pa_sink *s) {
536     struct userdata *u;
537
538     pa_sink_assert_ref(s);
539     pa_assert_se(u = s->userdata);
540
541     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
542         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
543         return;
544
545     pa_log_debug("Sink update requested latency");
546
547     /* Just hand this one over to the master sink */
548     pa_sink_input_set_requested_latency_within_thread(
549             u->sink_input,
550             pa_sink_get_requested_latency_within_thread(s));
551 }
552
553 /* Called from sink I/O thread context */
554 static void sink_request_rewind_cb(pa_sink *s) {
555     struct userdata *u;
556
557     pa_sink_assert_ref(s);
558     pa_assert_se(u = s->userdata);
559
560     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
561         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
562         return;
563
564     pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
565
566     /* Just hand this one over to the master sink */
567     pa_sink_input_request_rewind(u->sink_input,
568                                  s->thread_info.rewind_nbytes, TRUE, FALSE, FALSE);
569 }
570
571 /* Called from main context */
572 static void source_set_volume_cb(pa_source *s) {
573     struct userdata *u;
574
575     pa_source_assert_ref(s);
576     pa_assert_se(u = s->userdata);
577
578     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
579         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
580         return;
581
582     pa_source_output_set_volume(u->source_output, &s->real_volume, s->save_volume, TRUE);
583 }
584
585 /* Called from main context */
586 static void sink_set_volume_cb(pa_sink *s) {
587     struct userdata *u;
588
589     pa_sink_assert_ref(s);
590     pa_assert_se(u = s->userdata);
591
592     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
593         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
594         return;
595
596     pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, TRUE);
597 }
598
599 /* Called from main context. */
600 static void source_get_volume_cb(pa_source *s) {
601     struct userdata *u;
602     pa_cvolume v;
603
604     pa_source_assert_ref(s);
605     pa_assert_se(u = s->userdata);
606
607     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
608         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
609         return;
610
611     pa_source_output_get_volume(u->source_output, &v, TRUE);
612
613     if (pa_cvolume_equal(&s->real_volume, &v))
614         /* no change */
615         return;
616
617     s->real_volume = v;
618     pa_source_set_soft_volume(s, NULL);
619 }
620
621 /* Called from main context */
622 static void source_set_mute_cb(pa_source *s) {
623     struct userdata *u;
624
625     pa_source_assert_ref(s);
626     pa_assert_se(u = s->userdata);
627
628     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
629         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
630         return;
631
632     pa_source_output_set_mute(u->source_output, s->muted, s->save_muted);
633 }
634
635 /* Called from main context */
636 static void sink_set_mute_cb(pa_sink *s) {
637     struct userdata *u;
638
639     pa_sink_assert_ref(s);
640     pa_assert_se(u = s->userdata);
641
642     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
643         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
644         return;
645
646     pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
647 }
648
649 /* Called from main context */
650 static void source_get_mute_cb(pa_source *s) {
651     struct userdata *u;
652
653     pa_source_assert_ref(s);
654     pa_assert_se(u = s->userdata);
655
656     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
657         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
658         return;
659
660     pa_source_output_get_mute(u->source_output);
661 }
662
663 /* Called from source I/O thread context. */
664 static void apply_diff_time(struct userdata *u, int64_t diff_time) {
665     int64_t diff;
666
667     if (diff_time < 0) {
668         diff = pa_usec_to_bytes(-diff_time, &u->sink_input->sample_spec);
669
670         if (diff > 0) {
671             /* add some extra safety samples to compensate for jitter in the
672              * timings */
673             diff += 10 * pa_frame_size (&u->sink_input->sample_spec);
674
675             pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
676
677             u->sink_skip = diff;
678             u->source_skip = 0;
679         }
680     } else if (diff_time > 0) {
681         diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
682
683         if (diff > 0) {
684             pa_log("Playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
685
686             u->source_skip = diff;
687             u->sink_skip = 0;
688         }
689     }
690 }
691
692 /* Called from source I/O thread context. */
693 static void do_resync(struct userdata *u) {
694     int64_t diff_time;
695     struct snapshot latency_snapshot;
696
697     pa_log("Doing resync");
698
699     /* update our snapshot */
700     source_output_snapshot_within_thread(u, &latency_snapshot);
701     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
702
703     /* calculate drift between capture and playback */
704     diff_time = calc_diff(u, &latency_snapshot);
705
706     /* and adjust for the drift */
707     apply_diff_time(u, diff_time);
708 }
709
710 /* 1. Calculate drift at this point, pass to canceller
711  * 2. Push out playback samples in blocksize chunks
712  * 3. Push out capture samples in blocksize chunks
713  * 4. ???
714  * 5. Profit
715  *
716  * Called from source I/O thread context.
717  */
718 static void do_push_drift_comp(struct userdata *u) {
719     size_t rlen, plen;
720     pa_memchunk rchunk, pchunk, cchunk;
721     uint8_t *rdata, *pdata, *cdata;
722     float drift;
723     int unused PA_GCC_UNUSED;
724
725     rlen = pa_memblockq_get_length(u->source_memblockq);
726     plen = pa_memblockq_get_length(u->sink_memblockq);
727
728     /* Estimate snapshot drift as follows:
729      *   pd: amount of data consumed since last time
730      *   rd: amount of data consumed since last time
731      *
732      *   drift = (pd - rd) / rd;
733      *
734      * We calculate pd and rd as the memblockq length less the number of
735      * samples left from the last iteration (to avoid double counting
736      * those remainder samples.
737      */
738     drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
739     u->sink_rem = plen % u->sink_blocksize;
740     u->source_rem = rlen % u->source_blocksize;
741
742     /* Now let the canceller work its drift compensation magic */
743     u->ec->set_drift(u->ec, drift);
744
745     if (u->save_aec) {
746         if (u->drift_file)
747             fprintf(u->drift_file, "d %a\n", drift);
748     }
749
750     /* Send in the playback samples first */
751     while (plen >= u->sink_blocksize) {
752         pa_memblockq_peek_fixed_size(u->sink_memblockq, u->sink_blocksize, &pchunk);
753         pdata = pa_memblock_acquire(pchunk.memblock);
754         pdata += pchunk.index;
755
756         u->ec->play(u->ec, pdata);
757
758         if (u->save_aec) {
759             if (u->drift_file)
760                 fprintf(u->drift_file, "p %d\n", u->sink_blocksize);
761             if (u->played_file)
762                 unused = fwrite(pdata, 1, u->sink_blocksize, u->played_file);
763         }
764
765         pa_memblock_release(pchunk.memblock);
766         pa_memblockq_drop(u->sink_memblockq, u->sink_blocksize);
767         pa_memblock_unref(pchunk.memblock);
768
769         plen -= u->sink_blocksize;
770     }
771
772     /* And now the capture samples */
773     while (rlen >= u->source_blocksize) {
774         pa_memblockq_peek_fixed_size(u->source_memblockq, u->source_blocksize, &rchunk);
775
776         rdata = pa_memblock_acquire(rchunk.memblock);
777         rdata += rchunk.index;
778
779         cchunk.index = 0;
780         cchunk.length = u->source_blocksize;
781         cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
782         cdata = pa_memblock_acquire(cchunk.memblock);
783
784         u->ec->record(u->ec, rdata, cdata);
785
786         if (u->save_aec) {
787             if (u->drift_file)
788                 fprintf(u->drift_file, "c %d\n", u->source_blocksize);
789             if (u->captured_file)
790                 unused = fwrite(rdata, 1, u->source_blocksize, u->captured_file);
791             if (u->canceled_file)
792                 unused = fwrite(cdata, 1, u->source_blocksize, u->canceled_file);
793         }
794
795         pa_memblock_release(cchunk.memblock);
796         pa_memblock_release(rchunk.memblock);
797
798         pa_memblock_unref(rchunk.memblock);
799
800         pa_source_post(u->source, &cchunk);
801         pa_memblock_unref(cchunk.memblock);
802
803         pa_memblockq_drop(u->source_memblockq, u->source_blocksize);
804         rlen -= u->source_blocksize;
805     }
806 }
807
808 /* This one's simpler than the drift compensation case -- we just iterate over
809  * the capture buffer, and pass the canceller blocksize bytes of playback and
810  * capture data.
811  *
812  * Called from source I/O thread context. */
813 static void do_push(struct userdata *u) {
814     size_t rlen, plen;
815     pa_memchunk rchunk, pchunk, cchunk;
816     uint8_t *rdata, *pdata, *cdata;
817     int unused PA_GCC_UNUSED;
818
819     rlen = pa_memblockq_get_length(u->source_memblockq);
820     plen = pa_memblockq_get_length(u->sink_memblockq);
821
822     while (rlen >= u->source_blocksize) {
823         /* take fixed block from recorded samples */
824         pa_memblockq_peek_fixed_size(u->source_memblockq, u->source_blocksize, &rchunk);
825
826         if (plen >= u->sink_blocksize) {
827             /* take fixed block from played samples */
828             pa_memblockq_peek_fixed_size(u->sink_memblockq, u->sink_blocksize, &pchunk);
829
830             rdata = pa_memblock_acquire(rchunk.memblock);
831             rdata += rchunk.index;
832             pdata = pa_memblock_acquire(pchunk.memblock);
833             pdata += pchunk.index;
834
835             cchunk.index = 0;
836             cchunk.length = u->source_blocksize;
837             cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
838             cdata = pa_memblock_acquire(cchunk.memblock);
839
840             if (u->save_aec) {
841                 if (u->captured_file)
842                     unused = fwrite(rdata, 1, u->source_blocksize, u->captured_file);
843                 if (u->played_file)
844                     unused = fwrite(pdata, 1, u->sink_blocksize, u->played_file);
845             }
846
847             /* perform echo cancellation */
848             u->ec->run(u->ec, rdata, pdata, cdata);
849
850             if (u->save_aec) {
851                 if (u->canceled_file)
852                     unused = fwrite(cdata, 1, u->source_blocksize, u->canceled_file);
853             }
854
855             pa_memblock_release(cchunk.memblock);
856             pa_memblock_release(pchunk.memblock);
857             pa_memblock_release(rchunk.memblock);
858
859             /* drop consumed sink samples */
860             pa_memblockq_drop(u->sink_memblockq, u->sink_blocksize);
861             pa_memblock_unref(pchunk.memblock);
862
863             pa_memblock_unref(rchunk.memblock);
864             /* the filtered samples now become the samples from our
865              * source */
866             rchunk = cchunk;
867
868             plen -= u->sink_blocksize;
869         }
870
871         /* forward the (echo-canceled) data to the virtual source */
872         pa_source_post(u->source, &rchunk);
873         pa_memblock_unref(rchunk.memblock);
874
875         pa_memblockq_drop(u->source_memblockq, u->source_blocksize);
876         rlen -= u->source_blocksize;
877     }
878 }
879
880 /* Called from source I/O thread context. */
881 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
882     struct userdata *u;
883     size_t rlen, plen, to_skip;
884     pa_memchunk rchunk;
885
886     pa_source_output_assert_ref(o);
887     pa_source_output_assert_io_context(o);
888     pa_assert_se(u = o->userdata);
889
890     if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
891         pa_log("Push when no link?");
892         return;
893     }
894
895     if (PA_UNLIKELY(u->source->thread_info.state != PA_SOURCE_RUNNING ||
896                     u->sink->thread_info.state != PA_SINK_RUNNING)) {
897         pa_source_post(u->source, chunk);
898         return;
899     }
900
901     /* handle queued messages, do any message sending of our own */
902     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
903         ;
904
905     pa_memblockq_push_align(u->source_memblockq, chunk);
906
907     rlen = pa_memblockq_get_length(u->source_memblockq);
908     plen = pa_memblockq_get_length(u->sink_memblockq);
909
910     /* Let's not do anything else till we have enough data to process */
911     if (rlen < u->source_blocksize)
912         return;
913
914     /* See if we need to drop samples in order to sync */
915     if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
916         do_resync(u);
917     }
918
919     /* Okay, skip cancellation for skipped source samples if needed. */
920     if (PA_UNLIKELY(u->source_skip)) {
921         /* The slightly tricky bit here is that we drop all but modulo
922          * blocksize bytes and then adjust for that last bit on the sink side.
923          * We do this because the source data is coming at a fixed rate, which
924          * means the only way to try to catch up is drop sink samples and let
925          * the canceller cope up with this. */
926         to_skip = rlen >= u->source_skip ? u->source_skip : rlen;
927         to_skip -= to_skip % u->source_blocksize;
928
929         if (to_skip) {
930             pa_memblockq_peek_fixed_size(u->source_memblockq, to_skip, &rchunk);
931             pa_source_post(u->source, &rchunk);
932
933             pa_memblock_unref(rchunk.memblock);
934             pa_memblockq_drop(u->source_memblockq, to_skip);
935
936             rlen -= to_skip;
937             u->source_skip -= to_skip;
938         }
939
940         if (rlen && u->source_skip % u->source_blocksize) {
941             u->sink_skip += (uint64_t) (u->source_blocksize - (u->source_skip % u->source_blocksize)) * u->sink_blocksize / u->source_blocksize;
942             u->source_skip -= (u->source_skip % u->source_blocksize);
943         }
944     }
945
946     /* And for the sink, these samples have been played back already, so we can
947      * just drop them and get on with it. */
948     if (PA_UNLIKELY(u->sink_skip)) {
949         to_skip = plen >= u->sink_skip ? u->sink_skip : plen;
950
951         pa_memblockq_drop(u->sink_memblockq, to_skip);
952
953         plen -= to_skip;
954         u->sink_skip -= to_skip;
955     }
956
957     /* process and push out samples */
958     if (u->ec->params.drift_compensation)
959         do_push_drift_comp(u);
960     else
961         do_push(u);
962 }
963
964 /* Called from sink I/O thread context. */
965 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
966     struct userdata *u;
967
968     pa_sink_input_assert_ref(i);
969     pa_assert(chunk);
970     pa_assert_se(u = i->userdata);
971
972     if (u->sink->thread_info.rewind_requested)
973         pa_sink_process_rewind(u->sink, 0);
974
975     pa_sink_render_full(u->sink, nbytes, chunk);
976
977     if (i->thread_info.underrun_for > 0) {
978         pa_log_debug("Handling end of underrun.");
979         pa_atomic_store(&u->request_resync, 1);
980     }
981
982     /* let source thread handle the chunk. pass the sample count as well so that
983      * the source IO thread can update the right variables. */
984     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
985         NULL, 0, chunk, NULL);
986     u->send_counter += chunk->length;
987
988     return 0;
989 }
990
991 /* Called from source I/O thread context. */
992 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
993     struct userdata *u;
994
995     pa_source_output_assert_ref(o);
996     pa_source_output_assert_io_context(o);
997     pa_assert_se(u = o->userdata);
998
999     pa_source_process_rewind(u->source, nbytes);
1000
1001     /* go back on read side, we need to use older sink data for this */
1002     pa_memblockq_rewind(u->sink_memblockq, nbytes);
1003
1004     /* manipulate write index */
1005     pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, TRUE);
1006
1007     pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
1008         (long long) pa_memblockq_get_length (u->source_memblockq));
1009 }
1010
1011 /* Called from sink I/O thread context. */
1012 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1013     struct userdata *u;
1014
1015     pa_sink_input_assert_ref(i);
1016     pa_assert_se(u = i->userdata);
1017
1018     pa_log_debug("Sink process rewind %lld", (long long) nbytes);
1019
1020     pa_sink_process_rewind(u->sink, nbytes);
1021
1022     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
1023     u->send_counter -= nbytes;
1024 }
1025
1026 /* Called from source I/O thread context. */
1027 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
1028     size_t delay, rlen, plen;
1029     pa_usec_t now, latency;
1030
1031     now = pa_rtclock_now();
1032     latency = pa_source_get_latency_within_thread(u->source_output->source);
1033     delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
1034
1035     delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
1036     rlen = pa_memblockq_get_length(u->source_memblockq);
1037     plen = pa_memblockq_get_length(u->sink_memblockq);
1038
1039     snapshot->source_now = now;
1040     snapshot->source_latency = latency;
1041     snapshot->source_delay = delay;
1042     snapshot->recv_counter = u->recv_counter;
1043     snapshot->rlen = rlen + u->sink_skip;
1044     snapshot->plen = plen + u->source_skip;
1045 }
1046
1047 /* Called from source I/O thread context. */
1048 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1049     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
1050
1051     switch (code) {
1052
1053         case SOURCE_OUTPUT_MESSAGE_POST:
1054
1055             pa_source_output_assert_io_context(u->source_output);
1056
1057             if (u->source_output->source->thread_info.state == PA_SOURCE_RUNNING)
1058                 pa_memblockq_push_align(u->sink_memblockq, chunk);
1059             else
1060                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1061
1062             u->recv_counter += (int64_t) chunk->length;
1063
1064             return 0;
1065
1066         case SOURCE_OUTPUT_MESSAGE_REWIND:
1067             pa_source_output_assert_io_context(u->source_output);
1068
1069             /* manipulate write index, never go past what we have */
1070             if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
1071                 pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, TRUE);
1072             else
1073                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1074
1075             pa_log_debug("Sink rewind (%lld)", (long long) offset);
1076
1077             u->recv_counter -= offset;
1078
1079             return 0;
1080
1081         case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
1082             struct snapshot *snapshot = (struct snapshot *) data;
1083
1084             source_output_snapshot_within_thread(u, snapshot);
1085             return 0;
1086         }
1087
1088         case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
1089             apply_diff_time(u, offset);
1090             return 0;
1091
1092     }
1093
1094     return pa_source_output_process_msg(obj, code, data, offset, chunk);
1095 }
1096
1097 /* Called from sink I/O thread context. */
1098 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1099     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1100
1101     switch (code) {
1102
1103         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1104             size_t delay;
1105             pa_usec_t now, latency;
1106             struct snapshot *snapshot = (struct snapshot *) data;
1107
1108             pa_sink_input_assert_io_context(u->sink_input);
1109
1110             now = pa_rtclock_now();
1111             latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
1112             delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1113
1114             delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
1115
1116             snapshot->sink_now = now;
1117             snapshot->sink_latency = latency;
1118             snapshot->sink_delay = delay;
1119             snapshot->send_counter = u->send_counter;
1120             return 0;
1121         }
1122     }
1123
1124     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1125 }
1126
1127 /* Called from sink I/O thread context. */
1128 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1129     struct userdata *u;
1130
1131     pa_sink_input_assert_ref(i);
1132     pa_assert_se(u = i->userdata);
1133
1134     pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
1135
1136     /* FIXME: Too small max_rewind:
1137      * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
1138     pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
1139     pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
1140 }
1141
1142 /* Called from source I/O thread context. */
1143 static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
1144     struct userdata *u;
1145
1146     pa_source_output_assert_ref(o);
1147     pa_assert_se(u = o->userdata);
1148
1149     pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
1150
1151     pa_source_set_max_rewind_within_thread(u->source, nbytes);
1152 }
1153
1154 /* Called from sink I/O thread context. */
1155 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1156     struct userdata *u;
1157
1158     pa_sink_input_assert_ref(i);
1159     pa_assert_se(u = i->userdata);
1160
1161     pa_log_debug("Sink input update max request %lld", (long long) nbytes);
1162
1163     pa_sink_set_max_request_within_thread(u->sink, nbytes);
1164 }
1165
1166 /* Called from sink I/O thread context. */
1167 static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
1168     struct userdata *u;
1169     pa_usec_t latency;
1170
1171     pa_sink_input_assert_ref(i);
1172     pa_assert_se(u = i->userdata);
1173
1174     latency = pa_sink_get_requested_latency_within_thread(i->sink);
1175
1176     pa_log_debug("Sink input update requested latency %lld", (long long) latency);
1177 }
1178
1179 /* Called from source I/O thread context. */
1180 static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
1181     struct userdata *u;
1182     pa_usec_t latency;
1183
1184     pa_source_output_assert_ref(o);
1185     pa_assert_se(u = o->userdata);
1186
1187     latency = pa_source_get_requested_latency_within_thread(o->source);
1188
1189     pa_log_debug("Source output update requested latency %lld", (long long) latency);
1190 }
1191
1192 /* Called from sink I/O thread context. */
1193 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
1194     struct userdata *u;
1195
1196     pa_sink_input_assert_ref(i);
1197     pa_assert_se(u = i->userdata);
1198
1199     pa_log_debug("Sink input update latency range %lld %lld",
1200         (long long) i->sink->thread_info.min_latency,
1201         (long long) i->sink->thread_info.max_latency);
1202
1203     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1204 }
1205
1206 /* Called from source I/O thread context. */
1207 static void source_output_update_source_latency_range_cb(pa_source_output *o) {
1208     struct userdata *u;
1209
1210     pa_source_output_assert_ref(o);
1211     pa_assert_se(u = o->userdata);
1212
1213     pa_log_debug("Source output update latency range %lld %lld",
1214         (long long) o->source->thread_info.min_latency,
1215         (long long) o->source->thread_info.max_latency);
1216
1217     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1218 }
1219
1220 /* Called from sink I/O thread context. */
1221 static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1222     struct userdata *u;
1223
1224     pa_sink_input_assert_ref(i);
1225     pa_assert_se(u = i->userdata);
1226
1227     pa_log_debug("Sink input update fixed latency %lld",
1228         (long long) i->sink->thread_info.fixed_latency);
1229
1230     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1231 }
1232
1233 /* Called from source I/O thread context. */
1234 static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1235     struct userdata *u;
1236
1237     pa_source_output_assert_ref(o);
1238     pa_assert_se(u = o->userdata);
1239
1240     pa_log_debug("Source output update fixed latency %lld",
1241         (long long) o->source->thread_info.fixed_latency);
1242
1243     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1244 }
1245
1246 /* Called from source I/O thread context. */
1247 static void source_output_attach_cb(pa_source_output *o) {
1248     struct userdata *u;
1249
1250     pa_source_output_assert_ref(o);
1251     pa_source_output_assert_io_context(o);
1252     pa_assert_se(u = o->userdata);
1253
1254     pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1255     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1256     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1257     pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1258
1259     pa_log_debug("Source output %d attach", o->index);
1260
1261     pa_source_attach_within_thread(u->source);
1262
1263     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1264             o->source->thread_info.rtpoll,
1265             PA_RTPOLL_LATE,
1266             u->asyncmsgq);
1267 }
1268
1269 /* Called from sink I/O thread context. */
1270 static void sink_input_attach_cb(pa_sink_input *i) {
1271     struct userdata *u;
1272
1273     pa_sink_input_assert_ref(i);
1274     pa_assert_se(u = i->userdata);
1275
1276     pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1277     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1278
1279     /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1280      * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1281     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1282
1283     /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1284      * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1285      * HERE. SEE (6) */
1286     pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1287
1288     /* FIXME: Too small max_rewind:
1289      * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
1290     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1291
1292     pa_log_debug("Sink input %d attach", i->index);
1293
1294     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1295             i->sink->thread_info.rtpoll,
1296             PA_RTPOLL_LATE,
1297             u->asyncmsgq);
1298
1299     pa_sink_attach_within_thread(u->sink);
1300 }
1301
1302
1303 /* Called from source I/O thread context. */
1304 static void source_output_detach_cb(pa_source_output *o) {
1305     struct userdata *u;
1306
1307     pa_source_output_assert_ref(o);
1308     pa_source_output_assert_io_context(o);
1309     pa_assert_se(u = o->userdata);
1310
1311     pa_source_detach_within_thread(u->source);
1312     pa_source_set_rtpoll(u->source, NULL);
1313
1314     pa_log_debug("Source output %d detach", o->index);
1315
1316     if (u->rtpoll_item_read) {
1317         pa_rtpoll_item_free(u->rtpoll_item_read);
1318         u->rtpoll_item_read = NULL;
1319     }
1320 }
1321
1322 /* Called from sink I/O thread context. */
1323 static void sink_input_detach_cb(pa_sink_input *i) {
1324     struct userdata *u;
1325
1326     pa_sink_input_assert_ref(i);
1327     pa_assert_se(u = i->userdata);
1328
1329     pa_sink_detach_within_thread(u->sink);
1330
1331     pa_sink_set_rtpoll(u->sink, NULL);
1332
1333     pa_log_debug("Sink input %d detach", i->index);
1334
1335     if (u->rtpoll_item_write) {
1336         pa_rtpoll_item_free(u->rtpoll_item_write);
1337         u->rtpoll_item_write = NULL;
1338     }
1339 }
1340
1341 /* Called from source I/O thread context. */
1342 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1343     struct userdata *u;
1344
1345     pa_source_output_assert_ref(o);
1346     pa_source_output_assert_io_context(o);
1347     pa_assert_se(u = o->userdata);
1348
1349     pa_log_debug("Source output %d state %d", o->index, state);
1350 }
1351
1352 /* Called from sink I/O thread context. */
1353 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1354     struct userdata *u;
1355
1356     pa_sink_input_assert_ref(i);
1357     pa_assert_se(u = i->userdata);
1358
1359     pa_log_debug("Sink input %d state %d", i->index, state);
1360
1361     /* If we are added for the first time, ask for a rewinding so that
1362      * we are heard right-away. */
1363     if (PA_SINK_INPUT_IS_LINKED(state) &&
1364         i->thread_info.state == PA_SINK_INPUT_INIT) {
1365         pa_log_debug("Requesting rewind due to state change.");
1366         pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
1367     }
1368 }
1369
1370 /* Called from main context. */
1371 static void source_output_kill_cb(pa_source_output *o) {
1372     struct userdata *u;
1373
1374     pa_source_output_assert_ref(o);
1375     pa_assert_ctl_context();
1376     pa_assert_se(u = o->userdata);
1377
1378     u->dead = TRUE;
1379
1380     /* The order here matters! We first kill the source output, followed
1381      * by the source. That means the source callbacks must be protected
1382      * against an unconnected source output! */
1383     pa_source_output_unlink(u->source_output);
1384     pa_source_unlink(u->source);
1385
1386     pa_source_output_unref(u->source_output);
1387     u->source_output = NULL;
1388
1389     pa_source_unref(u->source);
1390     u->source = NULL;
1391
1392     pa_log_debug("Source output kill %d", o->index);
1393
1394     pa_module_unload_request(u->module, TRUE);
1395 }
1396
1397 /* Called from main context */
1398 static void sink_input_kill_cb(pa_sink_input *i) {
1399     struct userdata *u;
1400
1401     pa_sink_input_assert_ref(i);
1402     pa_assert_se(u = i->userdata);
1403
1404     u->dead = TRUE;
1405
1406     /* The order here matters! We first kill the sink input, followed
1407      * by the sink. That means the sink callbacks must be protected
1408      * against an unconnected sink input! */
1409     pa_sink_input_unlink(u->sink_input);
1410     pa_sink_unlink(u->sink);
1411
1412     pa_sink_input_unref(u->sink_input);
1413     u->sink_input = NULL;
1414
1415     pa_sink_unref(u->sink);
1416     u->sink = NULL;
1417
1418     pa_log_debug("Sink input kill %d", i->index);
1419
1420     pa_module_unload_request(u->module, TRUE);
1421 }
1422
1423 /* Called from main context. */
1424 static pa_bool_t source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1425     struct userdata *u;
1426
1427     pa_source_output_assert_ref(o);
1428     pa_assert_ctl_context();
1429     pa_assert_se(u = o->userdata);
1430
1431     if (u->dead || u->autoloaded)
1432         return FALSE;
1433
1434     return (u->source != dest) && (u->sink != dest->monitor_of);
1435 }
1436
1437 /* Called from main context */
1438 static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1439     struct userdata *u;
1440
1441     pa_sink_input_assert_ref(i);
1442     pa_assert_se(u = i->userdata);
1443
1444     if (u->dead || u->autoloaded)
1445         return FALSE;
1446
1447     return u->sink != dest;
1448 }
1449
1450 /* Called from main context. */
1451 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1452     struct userdata *u;
1453
1454     pa_source_output_assert_ref(o);
1455     pa_assert_ctl_context();
1456     pa_assert_se(u = o->userdata);
1457
1458     if (dest) {
1459         pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1460         pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1461     } else
1462         pa_source_set_asyncmsgq(u->source, NULL);
1463
1464     if (u->source_auto_desc && dest) {
1465         const char *y, *z;
1466         pa_proplist *pl;
1467
1468         pl = pa_proplist_new();
1469         y = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION);
1470         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1471         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1472                 y ? y : u->sink_input->sink->name);
1473
1474         pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1475         pa_proplist_free(pl);
1476     }
1477 }
1478
1479 /* Called from main context */
1480 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1481     struct userdata *u;
1482
1483     pa_sink_input_assert_ref(i);
1484     pa_assert_se(u = i->userdata);
1485
1486     if (dest) {
1487         pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1488         pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1489     } else
1490         pa_sink_set_asyncmsgq(u->sink, NULL);
1491
1492     if (u->sink_auto_desc && dest) {
1493         const char *y, *z;
1494         pa_proplist *pl;
1495
1496         pl = pa_proplist_new();
1497         y = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION);
1498         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1499         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1500                          y ? y : u->source_output->source->name);
1501
1502         pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1503         pa_proplist_free(pl);
1504     }
1505 }
1506
1507 /* Called from main context */
1508 static void sink_input_volume_changed_cb(pa_sink_input *i) {
1509     struct userdata *u;
1510
1511     pa_sink_input_assert_ref(i);
1512     pa_assert_se(u = i->userdata);
1513
1514     pa_sink_volume_changed(u->sink, &i->volume);
1515 }
1516
1517 /* Called from main context */
1518 static void sink_input_mute_changed_cb(pa_sink_input *i) {
1519     struct userdata *u;
1520
1521     pa_sink_input_assert_ref(i);
1522     pa_assert_se(u = i->userdata);
1523
1524     pa_sink_mute_changed(u->sink, i->muted);
1525 }
1526
1527 /* Called from main context */
1528 static int canceller_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1529     struct pa_echo_canceller_msg *msg;
1530     struct userdata *u;
1531
1532     pa_assert(o);
1533
1534     msg = PA_ECHO_CANCELLER_MSG(o);
1535     u = msg->userdata;
1536
1537     switch (code) {
1538         case ECHO_CANCELLER_MESSAGE_SET_VOLUME: {
1539             pa_cvolume *v = (pa_cvolume *) userdata;
1540
1541             if (u->use_volume_sharing)
1542                 pa_source_set_volume(u->source, v, TRUE, FALSE);
1543             else
1544                 pa_source_output_set_volume(u->source_output, v, FALSE, TRUE);
1545
1546             break;
1547         }
1548
1549         default:
1550             pa_assert_not_reached();
1551             break;
1552     }
1553
1554     return 0;
1555 }
1556
1557 /* Called by the canceller, so source I/O thread context. */
1558 void pa_echo_canceller_get_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1559     *v = ec->msg->userdata->thread_info.current_volume;
1560 }
1561
1562 /* Called by the canceller, so source I/O thread context. */
1563 void pa_echo_canceller_set_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1564     if (!pa_cvolume_equal(&ec->msg->userdata->thread_info.current_volume, v)) {
1565         pa_cvolume *vol = pa_xnewdup(pa_cvolume, v, 1);
1566
1567         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(ec->msg), ECHO_CANCELLER_MESSAGE_SET_VOLUME, vol, 0, NULL,
1568                 pa_xfree);
1569     }
1570 }
1571
1572 static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1573     if (pa_streq(method, "null"))
1574         return PA_ECHO_CANCELLER_NULL;
1575 #ifdef HAVE_SPEEX
1576     if (pa_streq(method, "speex"))
1577         return PA_ECHO_CANCELLER_SPEEX;
1578 #endif
1579 #ifdef HAVE_ADRIAN_EC
1580     if (pa_streq(method, "adrian"))
1581         return PA_ECHO_CANCELLER_ADRIAN;
1582 #endif
1583 #ifdef HAVE_WEBRTC
1584     if (pa_streq(method, "webrtc"))
1585         return PA_ECHO_CANCELLER_WEBRTC;
1586 #endif
1587     return PA_ECHO_CANCELLER_INVALID;
1588 }
1589
1590 /* Common initialisation bits between module-echo-cancel and the standalone
1591  * test program.
1592  *
1593  * Called from main context. */
1594 static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *source_ss, pa_channel_map *source_map) {
1595     const char *ec_string;
1596     pa_echo_canceller_method_t ec_method;
1597
1598     if (pa_modargs_get_sample_spec_and_channel_map(ma, source_ss, source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1599         pa_log("Invalid sample format specification or channel map");
1600         goto fail;
1601     }
1602
1603     u->ec = pa_xnew0(pa_echo_canceller, 1);
1604     if (!u->ec) {
1605         pa_log("Failed to alloc echo canceller");
1606         goto fail;
1607     }
1608
1609     ec_string = pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER);
1610     if ((ec_method = get_ec_method_from_string(ec_string)) < 0) {
1611         pa_log("Invalid echo canceller implementation");
1612         goto fail;
1613     }
1614
1615     pa_log_info("Using AEC engine: %s", ec_string);
1616
1617     u->ec->init = ec_table[ec_method].init;
1618     u->ec->play = ec_table[ec_method].play;
1619     u->ec->record = ec_table[ec_method].record;
1620     u->ec->set_drift = ec_table[ec_method].set_drift;
1621     u->ec->run = ec_table[ec_method].run;
1622     u->ec->done = ec_table[ec_method].done;
1623
1624     return 0;
1625
1626 fail:
1627     return -1;
1628 }
1629
1630 /* Called from main context. */
1631 int pa__init(pa_module*m) {
1632     struct userdata *u;
1633     pa_sample_spec source_ss, sink_ss;
1634     pa_channel_map source_map, sink_map;
1635     pa_modargs *ma;
1636     pa_source *source_master=NULL;
1637     pa_sink *sink_master=NULL;
1638     pa_source_output_new_data source_output_data;
1639     pa_sink_input_new_data sink_input_data;
1640     pa_source_new_data source_data;
1641     pa_sink_new_data sink_data;
1642     pa_memchunk silence;
1643     uint32_t temp;
1644     uint32_t nframes = 0;
1645
1646     pa_assert(m);
1647
1648     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1649         pa_log("Failed to parse module arguments.");
1650         goto fail;
1651     }
1652
1653     if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1654         pa_log("Master source not found");
1655         goto fail;
1656     }
1657     pa_assert(source_master);
1658
1659     if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1660         pa_log("Master sink not found");
1661         goto fail;
1662     }
1663     pa_assert(sink_master);
1664
1665     if (source_master->monitor_of == sink_master) {
1666         pa_log("Can't cancel echo between a sink and its monitor");
1667         goto fail;
1668     }
1669
1670     source_ss = source_master->sample_spec;
1671     source_ss.rate = DEFAULT_RATE;
1672     source_ss.channels = DEFAULT_CHANNELS;
1673     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1674
1675     sink_ss = sink_master->sample_spec;
1676     sink_map = sink_master->channel_map;
1677
1678     u = pa_xnew0(struct userdata, 1);
1679     if (!u) {
1680         pa_log("Failed to alloc userdata");
1681         goto fail;
1682     }
1683     u->core = m->core;
1684     u->module = m;
1685     m->userdata = u;
1686     u->dead = FALSE;
1687
1688     u->use_volume_sharing = TRUE;
1689     if (pa_modargs_get_value_boolean(ma, "use_volume_sharing", &u->use_volume_sharing) < 0) {
1690         pa_log("use_volume_sharing= expects a boolean argument");
1691         goto fail;
1692     }
1693
1694     temp = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1695     if (pa_modargs_get_value_u32(ma, "adjust_time", &temp) < 0) {
1696         pa_log("Failed to parse adjust_time value");
1697         goto fail;
1698     }
1699
1700     if (temp != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1701         u->adjust_time = temp * PA_USEC_PER_SEC;
1702     else
1703         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1704
1705     temp = DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC;
1706     if (pa_modargs_get_value_u32(ma, "adjust_threshold", &temp) < 0) {
1707         pa_log("Failed to parse adjust_threshold value");
1708         goto fail;
1709     }
1710
1711     if (temp != DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC)
1712         u->adjust_threshold = temp * PA_USEC_PER_MSEC;
1713     else
1714         u->adjust_threshold = DEFAULT_ADJUST_TOLERANCE;
1715
1716     u->save_aec = DEFAULT_SAVE_AEC;
1717     if (pa_modargs_get_value_boolean(ma, "save_aec", &u->save_aec) < 0) {
1718         pa_log("Failed to parse save_aec value");
1719         goto fail;
1720     }
1721
1722     u->autoloaded = DEFAULT_AUTOLOADED;
1723     if (pa_modargs_get_value_boolean(ma, "autoloaded", &u->autoloaded) < 0) {
1724         pa_log("Failed to parse autoloaded value");
1725         goto fail;
1726     }
1727
1728     if (init_common(ma, u, &source_ss, &source_map) < 0)
1729         goto fail;
1730
1731     u->asyncmsgq = pa_asyncmsgq_new(0);
1732     u->need_realign = TRUE;
1733
1734     pa_assert(u->ec->init);
1735     if (!u->ec->init(u->core, u->ec, &source_ss, &source_map, &sink_ss, &sink_map, &nframes, pa_modargs_get_value(ma, "aec_args", NULL))) {
1736         pa_log("Failed to init AEC engine");
1737         goto fail;
1738     }
1739
1740     u->source_blocksize = nframes * pa_frame_size(&source_ss);
1741     u->sink_blocksize = nframes * pa_frame_size(&sink_ss);
1742
1743     if (u->ec->params.drift_compensation)
1744         pa_assert(u->ec->set_drift);
1745
1746     /* Create source */
1747     pa_source_new_data_init(&source_data);
1748     source_data.driver = __FILE__;
1749     source_data.module = m;
1750     if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1751         source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1752     pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1753     pa_source_new_data_set_channel_map(&source_data, &source_map);
1754     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1755     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1756     if (!u->autoloaded)
1757         pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1758
1759     if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1760         pa_log("Invalid properties");
1761         pa_source_new_data_done(&source_data);
1762         goto fail;
1763     }
1764
1765     if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1766         const char *y, *z;
1767
1768         y = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1769         z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1770         pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1771                 z ? z : source_master->name, y ? y : sink_master->name);
1772     }
1773
1774     u->source = pa_source_new(m->core, &source_data, (source_master->flags & (PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY))
1775                                                      | (u->use_volume_sharing ? PA_SOURCE_SHARE_VOLUME_WITH_MASTER : 0));
1776     pa_source_new_data_done(&source_data);
1777
1778     if (!u->source) {
1779         pa_log("Failed to create source.");
1780         goto fail;
1781     }
1782
1783     u->source->parent.process_msg = source_process_msg_cb;
1784     u->source->set_state = source_set_state_cb;
1785     u->source->update_requested_latency = source_update_requested_latency_cb;
1786     pa_source_set_get_mute_callback(u->source, source_get_mute_cb);
1787     pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
1788     if (!u->use_volume_sharing) {
1789         pa_source_set_get_volume_callback(u->source, source_get_volume_cb);
1790         pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
1791         pa_source_enable_decibel_volume(u->source, TRUE);
1792     }
1793     u->source->userdata = u;
1794
1795     pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1796
1797     /* Create sink */
1798     pa_sink_new_data_init(&sink_data);
1799     sink_data.driver = __FILE__;
1800     sink_data.module = m;
1801     if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1802         sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1803     pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1804     pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1805     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1806     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1807     if (!u->autoloaded)
1808         pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1809
1810     if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1811         pa_log("Invalid properties");
1812         pa_sink_new_data_done(&sink_data);
1813         goto fail;
1814     }
1815
1816     if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1817         const char *y, *z;
1818
1819         y = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1820         z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1821         pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1822                 z ? z : sink_master->name, y ? y : source_master->name);
1823     }
1824
1825     u->sink = pa_sink_new(m->core, &sink_data, (sink_master->flags & (PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY))
1826                                                | (u->use_volume_sharing ? PA_SINK_SHARE_VOLUME_WITH_MASTER : 0));
1827     pa_sink_new_data_done(&sink_data);
1828
1829     if (!u->sink) {
1830         pa_log("Failed to create sink.");
1831         goto fail;
1832     }
1833
1834     u->sink->parent.process_msg = sink_process_msg_cb;
1835     u->sink->set_state = sink_set_state_cb;
1836     u->sink->update_requested_latency = sink_update_requested_latency_cb;
1837     u->sink->request_rewind = sink_request_rewind_cb;
1838     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
1839     if (!u->use_volume_sharing) {
1840         pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
1841         pa_sink_enable_decibel_volume(u->sink, TRUE);
1842     }
1843     u->sink->userdata = u;
1844
1845     pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1846
1847     /* Create source output */
1848     pa_source_output_new_data_init(&source_output_data);
1849     source_output_data.driver = __FILE__;
1850     source_output_data.module = m;
1851     pa_source_output_new_data_set_source(&source_output_data, source_master, FALSE);
1852     source_output_data.destination_source = u->source;
1853     /* FIXME
1854        source_output_data.flags = PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; */
1855
1856     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1857     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1858     pa_source_output_new_data_set_sample_spec(&source_output_data, &source_ss);
1859     pa_source_output_new_data_set_channel_map(&source_output_data, &source_map);
1860
1861     pa_source_output_new(&u->source_output, m->core, &source_output_data);
1862     pa_source_output_new_data_done(&source_output_data);
1863
1864     if (!u->source_output)
1865         goto fail;
1866
1867     u->source_output->parent.process_msg = source_output_process_msg_cb;
1868     u->source_output->push = source_output_push_cb;
1869     u->source_output->process_rewind = source_output_process_rewind_cb;
1870     u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1871     u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1872     u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1873     u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1874     u->source_output->kill = source_output_kill_cb;
1875     u->source_output->attach = source_output_attach_cb;
1876     u->source_output->detach = source_output_detach_cb;
1877     u->source_output->state_change = source_output_state_change_cb;
1878     u->source_output->may_move_to = source_output_may_move_to_cb;
1879     u->source_output->moving = source_output_moving_cb;
1880     u->source_output->userdata = u;
1881
1882     u->source->output_from_master = u->source_output;
1883
1884     /* Create sink input */
1885     pa_sink_input_new_data_init(&sink_input_data);
1886     sink_input_data.driver = __FILE__;
1887     sink_input_data.module = m;
1888     pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, FALSE);
1889     sink_input_data.origin_sink = u->sink;
1890     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
1891     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1892     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
1893     pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
1894     sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
1895
1896     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1897     pa_sink_input_new_data_done(&sink_input_data);
1898
1899     if (!u->sink_input)
1900         goto fail;
1901
1902     u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1903     u->sink_input->pop = sink_input_pop_cb;
1904     u->sink_input->process_rewind = sink_input_process_rewind_cb;
1905     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1906     u->sink_input->update_max_request = sink_input_update_max_request_cb;
1907     u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
1908     u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1909     u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
1910     u->sink_input->kill = sink_input_kill_cb;
1911     u->sink_input->attach = sink_input_attach_cb;
1912     u->sink_input->detach = sink_input_detach_cb;
1913     u->sink_input->state_change = sink_input_state_change_cb;
1914     u->sink_input->may_move_to = sink_input_may_move_to_cb;
1915     u->sink_input->moving = sink_input_moving_cb;
1916     if (!u->use_volume_sharing)
1917         u->sink_input->volume_changed = sink_input_volume_changed_cb;
1918     u->sink_input->mute_changed = sink_input_mute_changed_cb;
1919     u->sink_input->userdata = u;
1920
1921     u->sink->input_to_master = u->sink_input;
1922
1923     pa_sink_input_get_silence(u->sink_input, &silence);
1924
1925     u->source_memblockq = pa_memblockq_new("module-echo-cancel source_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1926         &source_ss, 1, 1, 0, &silence);
1927     u->sink_memblockq = pa_memblockq_new("module-echo-cancel sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1928         &sink_ss, 1, 1, 0, &silence);
1929
1930     pa_memblock_unref(silence.memblock);
1931
1932     if (!u->source_memblockq || !u->sink_memblockq) {
1933         pa_log("Failed to create memblockq.");
1934         goto fail;
1935     }
1936
1937     if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
1938         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1939     else if (u->ec->params.drift_compensation) {
1940         pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
1941         u->adjust_time = 0;
1942         /* Perform resync just once to give the canceller a leg up */
1943         pa_atomic_store(&u->request_resync, 1);
1944     }
1945
1946     if (u->save_aec) {
1947         pa_log("Creating AEC files in /tmp");
1948         u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
1949         if (u->captured_file == NULL)
1950             perror ("fopen failed");
1951         u->played_file = fopen("/tmp/aec_play.sw", "wb");
1952         if (u->played_file == NULL)
1953             perror ("fopen failed");
1954         u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
1955         if (u->canceled_file == NULL)
1956             perror ("fopen failed");
1957         if (u->ec->params.drift_compensation) {
1958             u->drift_file = fopen("/tmp/aec_drift.txt", "w");
1959             if (u->drift_file == NULL)
1960                 perror ("fopen failed");
1961         }
1962     }
1963
1964     u->ec->msg = pa_msgobject_new(pa_echo_canceller_msg);
1965     u->ec->msg->parent.process_msg = canceller_process_msg_cb;
1966     u->ec->msg->userdata = u;
1967
1968     u->thread_info.current_volume = u->source->reference_volume;
1969
1970     pa_sink_put(u->sink);
1971     pa_source_put(u->source);
1972
1973     pa_sink_input_put(u->sink_input);
1974     pa_source_output_put(u->source_output);
1975     pa_modargs_free(ma);
1976
1977     return 0;
1978
1979 fail:
1980     if (ma)
1981         pa_modargs_free(ma);
1982
1983     pa__done(m);
1984
1985     return -1;
1986 }
1987
1988 /* Called from main context. */
1989 int pa__get_n_used(pa_module *m) {
1990     struct userdata *u;
1991
1992     pa_assert(m);
1993     pa_assert_se(u = m->userdata);
1994
1995     return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
1996 }
1997
1998 /* Called from main context. */
1999 void pa__done(pa_module*m) {
2000     struct userdata *u;
2001
2002     pa_assert(m);
2003
2004     if (!(u = m->userdata))
2005         return;
2006
2007     u->dead = TRUE;
2008
2009     /* See comments in source_output_kill_cb() above regarding
2010      * destruction order! */
2011
2012     if (u->time_event)
2013         u->core->mainloop->time_free(u->time_event);
2014
2015     if (u->source_output)
2016         pa_source_output_unlink(u->source_output);
2017     if (u->sink_input)
2018         pa_sink_input_unlink(u->sink_input);
2019
2020     if (u->source)
2021         pa_source_unlink(u->source);
2022     if (u->sink)
2023         pa_sink_unlink(u->sink);
2024
2025     if (u->source_output)
2026         pa_source_output_unref(u->source_output);
2027     if (u->sink_input)
2028         pa_sink_input_unref(u->sink_input);
2029
2030     if (u->source)
2031         pa_source_unref(u->source);
2032     if (u->sink)
2033         pa_sink_unref(u->sink);
2034
2035     if (u->source_memblockq)
2036         pa_memblockq_free(u->source_memblockq);
2037     if (u->sink_memblockq)
2038         pa_memblockq_free(u->sink_memblockq);
2039
2040     if (u->ec) {
2041         if (u->ec->done)
2042             u->ec->done(u->ec);
2043
2044         pa_xfree(u->ec);
2045     }
2046
2047     if (u->asyncmsgq)
2048         pa_asyncmsgq_unref(u->asyncmsgq);
2049
2050     if (u->save_aec) {
2051         if (u->played_file)
2052             fclose(u->played_file);
2053         if (u->captured_file)
2054             fclose(u->captured_file);
2055         if (u->canceled_file)
2056             fclose(u->canceled_file);
2057         if (u->drift_file)
2058             fclose(u->drift_file);
2059     }
2060
2061     pa_xfree(u);
2062 }
2063
2064 #ifdef ECHO_CANCEL_TEST
2065 /*
2066  * Stand-alone test program for running in the canceller on pre-recorded files.
2067  */
2068 int main(int argc, char* argv[]) {
2069     struct userdata u;
2070     pa_sample_spec source_ss, sink_ss;
2071     pa_channel_map source_map, sink_map;
2072     pa_modargs *ma = NULL;
2073     uint8_t *rdata = NULL, *pdata = NULL, *cdata = NULL;
2074     int unused PA_GCC_UNUSED;
2075     int ret = 0, i;
2076     char c;
2077     float drift;
2078     uint32_t nframes;
2079
2080     pa_memzero(&u, sizeof(u));
2081
2082     if (argc < 4 || argc > 7) {
2083         goto usage;
2084     }
2085
2086     u.captured_file = fopen(argv[2], "rb");
2087     if (u.captured_file == NULL) {
2088         perror ("fopen failed");
2089         goto fail;
2090     }
2091     u.played_file = fopen(argv[1], "rb");
2092     if (u.played_file == NULL) {
2093         perror ("fopen failed");
2094         goto fail;
2095     }
2096     u.canceled_file = fopen(argv[3], "wb");
2097     if (u.canceled_file == NULL) {
2098         perror ("fopen failed");
2099         goto fail;
2100     }
2101
2102     u.core = pa_xnew0(pa_core, 1);
2103     u.core->cpu_info.cpu_type = PA_CPU_X86;
2104     u.core->cpu_info.flags.x86 |= PA_CPU_X86_SSE;
2105
2106     if (!(ma = pa_modargs_new(argc > 4 ? argv[4] : NULL, valid_modargs))) {
2107         pa_log("Failed to parse module arguments.");
2108         goto fail;
2109     }
2110
2111     source_ss.format = PA_SAMPLE_S16LE;
2112     source_ss.rate = DEFAULT_RATE;
2113     source_ss.channels = DEFAULT_CHANNELS;
2114     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2115
2116     sink_ss.format = PA_SAMPLE_S16LE;
2117     sink_ss.rate = DEFAULT_RATE;
2118     sink_ss.channels = DEFAULT_CHANNELS;
2119     pa_channel_map_init_auto(&sink_map, sink_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2120
2121     if (init_common(ma, &u, &source_ss, &source_map) < 0)
2122         goto fail;
2123
2124     if (!u.ec->init(u.core, u.ec, &source_ss, &source_map, &sink_ss, &sink_map, &nframes,
2125                      (argc > 5) ? argv[5] : NULL )) {
2126         pa_log("Failed to init AEC engine");
2127         goto fail;
2128     }
2129     u.source_blocksize = nframes * pa_frame_size(&source_ss);
2130     u.sink_blocksize = nframes * pa_frame_size(&sink_ss);
2131
2132     if (u.ec->params.drift_compensation) {
2133         if (argc < 7) {
2134             pa_log("Drift compensation enabled but drift file not specified");
2135             goto fail;
2136         }
2137
2138         u.drift_file = fopen(argv[6], "rt");
2139
2140         if (u.drift_file == NULL) {
2141             perror ("fopen failed");
2142             goto fail;
2143         }
2144     }
2145
2146     rdata = pa_xmalloc(u.source_blocksize);
2147     pdata = pa_xmalloc(u.sink_blocksize);
2148     cdata = pa_xmalloc(u.source_blocksize);
2149
2150     if (!u.ec->params.drift_compensation) {
2151         while (fread(rdata, u.source_blocksize, 1, u.captured_file) > 0) {
2152             if (fread(pdata, u.sink_blocksize, 1, u.played_file) == 0) {
2153                 perror("Played file ended before captured file");
2154                 goto fail;
2155             }
2156
2157             u.ec->run(u.ec, rdata, pdata, cdata);
2158
2159             unused = fwrite(cdata, u.source_blocksize, 1, u.canceled_file);
2160         }
2161     } else {
2162         while (fscanf(u.drift_file, "%c", &c) > 0) {
2163             switch (c) {
2164                 case 'd':
2165                     if (!fscanf(u.drift_file, "%a", &drift)) {
2166                         perror("Drift file incomplete");
2167                         goto fail;
2168                     }
2169
2170                     u.ec->set_drift(u.ec, drift);
2171
2172                     break;
2173
2174                 case 'c':
2175                     if (!fscanf(u.drift_file, "%d", &i)) {
2176                         perror("Drift file incomplete");
2177                         goto fail;
2178                     }
2179
2180                     if (fread(rdata, i, 1, u.captured_file) <= 0) {
2181                         perror("Captured file ended prematurely");
2182                         goto fail;
2183                     }
2184
2185                     u.ec->record(u.ec, rdata, cdata);
2186
2187                     unused = fwrite(cdata, i, 1, u.canceled_file);
2188
2189                     break;
2190
2191                 case 'p':
2192                     if (!fscanf(u.drift_file, "%d", &i)) {
2193                         perror("Drift file incomplete");
2194                         goto fail;
2195                     }
2196
2197                     if (fread(pdata, i, 1, u.played_file) <= 0) {
2198                         perror("Played file ended prematurely");
2199                         goto fail;
2200                     }
2201
2202                     u.ec->play(u.ec, pdata);
2203
2204                     break;
2205             }
2206         }
2207
2208         if (fread(rdata, i, 1, u.captured_file) > 0)
2209             pa_log("All capture data was not consumed");
2210         if (fread(pdata, i, 1, u.played_file) > 0)
2211             pa_log("All playback data was not consumed");
2212     }
2213
2214     u.ec->done(u.ec);
2215
2216 out:
2217     if (u.captured_file)
2218         fclose(u.captured_file);
2219     if (u.played_file)
2220         fclose(u.played_file);
2221     if (u.canceled_file)
2222         fclose(u.canceled_file);
2223     if (u.drift_file)
2224         fclose(u.drift_file);
2225
2226     pa_xfree(rdata);
2227     pa_xfree(pdata);
2228     pa_xfree(cdata);
2229
2230     pa_xfree(u.ec);
2231     pa_xfree(u.core);
2232
2233     if (ma)
2234         pa_modargs_free(ma);
2235
2236     return ret;
2237
2238 usage:
2239     pa_log("Usage: %s play_file rec_file out_file [module args] [aec_args] [drift_file]", argv[0]);
2240
2241 fail:
2242     ret = -1;
2243     goto out;
2244 }
2245 #endif /* ECHO_CANCEL_TEST */