echo-cancel: Use more human-friendly descriptions
[profile/ivi/pulseaudio.git] / src / modules / echo-cancel / module-echo-cancel.c
1 /***
2     This file is part of PulseAudio.
3
4     Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6     Based on module-virtual-sink.c
7              module-virtual-source.c
8              module-loopback.c
9
10         Copyright 2010 Intel Corporation
11         Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13     PulseAudio is free software; you can redistribute it and/or modify
14     it under the terms of the GNU Lesser General Public License as published
15     by the Free Software Foundation; either version 2.1 of the License,
16     or (at your option) any later version.
17
18     PulseAudio is distributed in the hope that it will be useful, but
19     WITHOUT ANY WARRANTY; without even the implied warranty of
20     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21     General Public License for more details.
22
23     You should have received a copy of the GNU Lesser General Public License
24     along with PulseAudio; if not, write to the Free Software
25     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26     USA.
27 ***/
28
29 #ifdef HAVE_CONFIG_H
30 #include <config.h>
31 #endif
32
33 #include <stdio.h>
34 #include <math.h>
35
36 #include "echo-cancel.h"
37
38 #include <pulse/xmalloc.h>
39 #include <pulse/timeval.h>
40 #include <pulse/rtclock.h>
41
42 #include <pulsecore/i18n.h>
43 #include <pulsecore/atomic.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/namereg.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-rtclock.h>
49 #include <pulsecore/core-util.h>
50 #include <pulsecore/modargs.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/rtpoll.h>
53 #include <pulsecore/sample-util.h>
54 #include <pulsecore/ltdl-helper.h>
55
56 #include "module-echo-cancel-symdef.h"
57
58 PA_MODULE_AUTHOR("Wim Taymans");
59 PA_MODULE_DESCRIPTION("Echo Cancellation");
60 PA_MODULE_VERSION(PACKAGE_VERSION);
61 PA_MODULE_LOAD_ONCE(FALSE);
62 PA_MODULE_USAGE(
63         _("source_name=<name for the source> "
64           "source_properties=<properties for the source> "
65           "source_master=<name of source to filter> "
66           "sink_name=<name for the sink> "
67           "sink_properties=<properties for the sink> "
68           "sink_master=<name of sink to filter> "
69           "adjust_time=<how often to readjust rates in s> "
70           "adjust_threshold=<how much drift to readjust after in ms> "
71           "format=<sample format> "
72           "rate=<sample rate> "
73           "channels=<number of channels> "
74           "channel_map=<channel map> "
75           "aec_method=<implementation to use> "
76           "aec_args=<parameters for the AEC engine> "
77           "save_aec=<save AEC data in /tmp> "
78           "autoloaded=<set if this module is being loaded automatically> "
79           "use_volume_sharing=<yes or no> "
80         ));
81
82 /* NOTE: Make sure the enum and ec_table are maintained in the correct order */
83 typedef enum {
84     PA_ECHO_CANCELLER_INVALID = -1,
85     PA_ECHO_CANCELLER_SPEEX = 0,
86     PA_ECHO_CANCELLER_ADRIAN,
87 #ifdef HAVE_WEBRTC
88     PA_ECHO_CANCELLER_WEBRTC,
89 #endif
90 } pa_echo_canceller_method_t;
91
92 #if HAVE_WEBRTC
93 #define DEFAULT_ECHO_CANCELLER "webrtc"
94 #else
95 #define DEFAULT_ECHO_CANCELLER "speex"
96 #endif
97
98 static const pa_echo_canceller ec_table[] = {
99     {
100         /* Speex */
101         .init                   = pa_speex_ec_init,
102         .run                    = pa_speex_ec_run,
103         .done                   = pa_speex_ec_done,
104     },
105     {
106         /* Adrian Andre's NLMS implementation */
107         .init                   = pa_adrian_ec_init,
108         .run                    = pa_adrian_ec_run,
109         .done                   = pa_adrian_ec_done,
110     },
111 #ifdef HAVE_WEBRTC
112     {
113         /* WebRTC's audio processing engine */
114         .init                   = pa_webrtc_ec_init,
115         .play                   = pa_webrtc_ec_play,
116         .record                 = pa_webrtc_ec_record,
117         .set_drift              = pa_webrtc_ec_set_drift,
118         .run                    = pa_webrtc_ec_run,
119         .done                   = pa_webrtc_ec_done,
120     },
121 #endif
122 };
123
124 #define DEFAULT_RATE 32000
125 #define DEFAULT_CHANNELS 1
126 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
127 #define DEFAULT_ADJUST_TOLERANCE (5*PA_USEC_PER_MSEC)
128 #define DEFAULT_SAVE_AEC FALSE
129 #define DEFAULT_AUTOLOADED FALSE
130
131 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
132
133 /* Can only be used in main context */
134 #define IS_ACTIVE(u) ((pa_source_get_state((u)->source) == PA_SOURCE_RUNNING) && \
135                       (pa_sink_get_state((u)->sink) == PA_SINK_RUNNING))
136
137 /* This module creates a new (virtual) source and sink.
138  *
139  * The data sent to the new sink is kept in a memblockq before being
140  * forwarded to the real sink_master.
141  *
142  * Data read from source_master is matched against the saved sink data and
143  * echo canceled data is then pushed onto the new source.
144  *
145  * Both source and sink masters have their own threads to push/pull data
146  * respectively. We however perform all our actions in the source IO thread.
147  * To do this we send all played samples to the source IO thread where they
148  * are then pushed into the memblockq.
149  *
150  * Alignment is performed in two steps:
151  *
152  * 1) when something happens that requires quick adjustment of the alignment of
153  *    capture and playback samples, we perform a resync. This adjusts the
154  *    position in the playback memblock to the requested sample. Quick
155  *    adjustments include moving the playback samples before the capture
156  *    samples (because else the echo canceler does not work) or when the
157  *    playback pointer drifts too far away.
158  *
159  * 2) periodically check the difference between capture and playback. we use a
160  *    low and high watermark for adjusting the alignment. playback should always
161  *    be before capture and the difference should not be bigger than one frame
162  *    size. We would ideally like to resample the sink_input but most driver
163  *    don't give enough accuracy to be able to do that right now.
164  */
165
166 struct userdata;
167
168 struct pa_echo_canceller_msg {
169     pa_msgobject parent;
170     struct userdata *userdata;
171 };
172
173 PA_DEFINE_PRIVATE_CLASS(pa_echo_canceller_msg, pa_msgobject);
174 #define PA_ECHO_CANCELLER_MSG(o) (pa_echo_canceller_msg_cast(o))
175
176 struct snapshot {
177     pa_usec_t sink_now;
178     pa_usec_t sink_latency;
179     size_t sink_delay;
180     int64_t send_counter;
181
182     pa_usec_t source_now;
183     pa_usec_t source_latency;
184     size_t source_delay;
185     int64_t recv_counter;
186     size_t rlen;
187     size_t plen;
188 };
189
190 struct userdata {
191     pa_core *core;
192     pa_module *module;
193
194     pa_bool_t autoloaded;
195     pa_bool_t dead;
196     pa_bool_t save_aec;
197
198     pa_echo_canceller *ec;
199     uint32_t blocksize;
200
201     pa_bool_t need_realign;
202
203     /* to wakeup the source I/O thread */
204     pa_asyncmsgq *asyncmsgq;
205     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
206
207     pa_source *source;
208     pa_bool_t source_auto_desc;
209     pa_source_output *source_output;
210     pa_memblockq *source_memblockq; /* echo canceler needs fixed sized chunks */
211     size_t source_skip;
212
213     pa_sink *sink;
214     pa_bool_t sink_auto_desc;
215     pa_sink_input *sink_input;
216     pa_memblockq *sink_memblockq;
217     int64_t send_counter;          /* updated in sink IO thread */
218     int64_t recv_counter;
219     size_t sink_skip;
220
221     /* Bytes left over from previous iteration */
222     size_t sink_rem;
223     size_t source_rem;
224
225     pa_atomic_t request_resync;
226
227     pa_time_event *time_event;
228     pa_usec_t adjust_time;
229     int adjust_threshold;
230
231     FILE *captured_file;
232     FILE *played_file;
233     FILE *canceled_file;
234     FILE *drift_file;
235
236     pa_bool_t use_volume_sharing;
237
238     struct {
239         pa_cvolume current_volume;
240     } thread_info;
241 };
242
243 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
244
245 static const char* const valid_modargs[] = {
246     "source_name",
247     "source_properties",
248     "source_master",
249     "sink_name",
250     "sink_properties",
251     "sink_master",
252     "adjust_time",
253     "adjust_threshold",
254     "format",
255     "rate",
256     "channels",
257     "channel_map",
258     "aec_method",
259     "aec_args",
260     "save_aec",
261     "autoloaded",
262     "use_volume_sharing",
263     NULL
264 };
265
266 enum {
267     SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
268     SOURCE_OUTPUT_MESSAGE_REWIND,
269     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
270     SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
271 };
272
273 enum {
274     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
275 };
276
277 enum {
278     ECHO_CANCELLER_MESSAGE_SET_VOLUME,
279 };
280
281 static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
282     int64_t buffer, diff_time, buffer_latency;
283
284     /* get the number of samples between capture and playback */
285     if (snapshot->plen > snapshot->rlen)
286         buffer = snapshot->plen - snapshot->rlen;
287     else
288         buffer = 0;
289
290     buffer += snapshot->source_delay + snapshot->sink_delay;
291
292     /* add the amount of samples not yet transferred to the source context */
293     if (snapshot->recv_counter <= snapshot->send_counter)
294         buffer += (int64_t) (snapshot->send_counter - snapshot->recv_counter);
295     else
296         buffer += PA_CLIP_SUB(buffer, (int64_t) (snapshot->recv_counter - snapshot->send_counter));
297
298     /* convert to time */
299     buffer_latency = pa_bytes_to_usec(buffer, &u->source_output->sample_spec);
300
301     /* capture and playback samples are perfectly aligned when diff_time is 0 */
302     diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
303           (snapshot->source_now - snapshot->source_latency);
304
305     pa_log_debug("diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
306         (long long) snapshot->sink_latency,
307         (long long) buffer_latency, (long long) snapshot->source_latency,
308         (long long) snapshot->source_delay, (long long) snapshot->sink_delay,
309         (long long) (snapshot->send_counter - snapshot->recv_counter),
310         (long long) (snapshot->sink_now - snapshot->source_now));
311
312     return diff_time;
313 }
314
315 /* Called from main context */
316 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
317     struct userdata *u = userdata;
318     uint32_t old_rate, base_rate, new_rate;
319     int64_t diff_time;
320     /*size_t fs*/
321     struct snapshot latency_snapshot;
322
323     pa_assert(u);
324     pa_assert(a);
325     pa_assert(u->time_event == e);
326     pa_assert_ctl_context();
327
328     if (!IS_ACTIVE(u))
329         return;
330
331     /* update our snapshots */
332     pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
333     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
334
335     /* calculate drift between capture and playback */
336     diff_time = calc_diff(u, &latency_snapshot);
337
338     /*fs = pa_frame_size(&u->source_output->sample_spec);*/
339     old_rate = u->sink_input->sample_spec.rate;
340     base_rate = u->source_output->sample_spec.rate;
341
342     if (diff_time < 0) {
343         /* recording before playback, we need to adjust quickly. The echo
344          * canceler does not work in this case. */
345         pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
346             NULL, diff_time, NULL, NULL);
347         /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
348         new_rate = base_rate;
349     }
350     else {
351         if (diff_time > u->adjust_threshold) {
352             /* diff too big, quickly adjust */
353             pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
354                 NULL, diff_time, NULL, NULL);
355         }
356
357         /* recording behind playback, we need to slowly adjust the rate to match */
358         /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
359
360         /* assume equal samplerates for now */
361         new_rate = base_rate;
362     }
363
364     /* make sure we don't make too big adjustments because that sounds horrible */
365     if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
366         new_rate = base_rate;
367
368     if (new_rate != old_rate) {
369         pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
370
371         pa_sink_input_set_rate(u->sink_input, new_rate);
372     }
373
374     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
375 }
376
377 /* Called from source I/O thread context */
378 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
379     struct userdata *u = PA_SOURCE(o)->userdata;
380
381     switch (code) {
382
383         case PA_SOURCE_MESSAGE_GET_LATENCY:
384
385             /* The source is _put() before the source output is, so let's
386              * make sure we don't access it in that time. Also, the
387              * source output is first shut down, the source second. */
388             if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
389                 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
390                 *((pa_usec_t*) data) = 0;
391                 return 0;
392             }
393
394             *((pa_usec_t*) data) =
395
396                 /* Get the latency of the master source */
397                 pa_source_get_latency_within_thread(u->source_output->source) +
398                 /* Add the latency internal to our source output on top */
399                 pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
400                 /* and the buffering we do on the source */
401                 pa_bytes_to_usec(u->blocksize, &u->source_output->source->sample_spec);
402
403             return 0;
404
405         case PA_SOURCE_MESSAGE_SET_VOLUME_SYNCED:
406             u->thread_info.current_volume = u->source->reference_volume;
407             break;
408     }
409
410     return pa_source_process_msg(o, code, data, offset, chunk);
411 }
412
413 /* Called from sink I/O thread context */
414 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
415     struct userdata *u = PA_SINK(o)->userdata;
416
417     switch (code) {
418
419         case PA_SINK_MESSAGE_GET_LATENCY:
420
421             /* The sink is _put() before the sink input is, so let's
422              * make sure we don't access it in that time. Also, the
423              * sink input is first shut down, the sink second. */
424             if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
425                 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
426                 *((pa_usec_t*) data) = 0;
427                 return 0;
428             }
429
430             *((pa_usec_t*) data) =
431
432                 /* Get the latency of the master sink */
433                 pa_sink_get_latency_within_thread(u->sink_input->sink) +
434
435                 /* Add the latency internal to our sink input on top */
436                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
437
438             return 0;
439     }
440
441     return pa_sink_process_msg(o, code, data, offset, chunk);
442 }
443
444
445 /* Called from main context */
446 static int source_set_state_cb(pa_source *s, pa_source_state_t state) {
447     struct userdata *u;
448
449     pa_source_assert_ref(s);
450     pa_assert_se(u = s->userdata);
451
452     if (!PA_SOURCE_IS_LINKED(state) ||
453         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
454         return 0;
455
456     if (state == PA_SOURCE_RUNNING) {
457         /* restart timer when both sink and source are active */
458         if (IS_ACTIVE(u) && u->adjust_time)
459             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
460
461         pa_atomic_store(&u->request_resync, 1);
462         pa_source_output_cork(u->source_output, FALSE);
463     } else if (state == PA_SOURCE_SUSPENDED) {
464         pa_source_output_cork(u->source_output, TRUE);
465     }
466
467     return 0;
468 }
469
470 /* Called from main context */
471 static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state) {
472     struct userdata *u;
473
474     pa_sink_assert_ref(s);
475     pa_assert_se(u = s->userdata);
476
477     if (!PA_SINK_IS_LINKED(state) ||
478         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
479         return 0;
480
481     if (state == PA_SINK_RUNNING) {
482         /* restart timer when both sink and source are active */
483         if (IS_ACTIVE(u) && u->adjust_time)
484             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
485
486         pa_atomic_store(&u->request_resync, 1);
487         pa_sink_input_cork(u->sink_input, FALSE);
488     } else if (state == PA_SINK_SUSPENDED) {
489         pa_sink_input_cork(u->sink_input, TRUE);
490     }
491
492     return 0;
493 }
494
495 /* Called from I/O thread context */
496 static void source_update_requested_latency_cb(pa_source *s) {
497     struct userdata *u;
498
499     pa_source_assert_ref(s);
500     pa_assert_se(u = s->userdata);
501
502     if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
503         !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
504         return;
505
506     pa_log_debug("Source update requested latency");
507
508     /* Just hand this one over to the master source */
509     pa_source_output_set_requested_latency_within_thread(
510             u->source_output,
511             pa_source_get_requested_latency_within_thread(s));
512 }
513
514 /* Called from I/O thread context */
515 static void sink_update_requested_latency_cb(pa_sink *s) {
516     struct userdata *u;
517
518     pa_sink_assert_ref(s);
519     pa_assert_se(u = s->userdata);
520
521     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
522         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
523         return;
524
525     pa_log_debug("Sink update requested latency");
526
527     /* Just hand this one over to the master sink */
528     pa_sink_input_set_requested_latency_within_thread(
529             u->sink_input,
530             pa_sink_get_requested_latency_within_thread(s));
531 }
532
533 /* Called from I/O thread context */
534 static void sink_request_rewind_cb(pa_sink *s) {
535     struct userdata *u;
536
537     pa_sink_assert_ref(s);
538     pa_assert_se(u = s->userdata);
539
540     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
541         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
542         return;
543
544     pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
545
546     /* Just hand this one over to the master sink */
547     pa_sink_input_request_rewind(u->sink_input,
548                                  s->thread_info.rewind_nbytes, TRUE, FALSE, FALSE);
549 }
550
551 /* Called from main context */
552 static void source_set_volume_cb(pa_source *s) {
553     struct userdata *u;
554
555     pa_source_assert_ref(s);
556     pa_assert_se(u = s->userdata);
557
558     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
559         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
560         return;
561
562     pa_source_output_set_volume(u->source_output, &s->real_volume, s->save_volume, TRUE);
563 }
564
565 /* Called from main context */
566 static void sink_set_volume_cb(pa_sink *s) {
567     struct userdata *u;
568
569     pa_sink_assert_ref(s);
570     pa_assert_se(u = s->userdata);
571
572     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
573         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
574         return;
575
576     pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, TRUE);
577 }
578
579 static void source_get_volume_cb(pa_source *s) {
580     struct userdata *u;
581     pa_cvolume v;
582
583     pa_source_assert_ref(s);
584     pa_assert_se(u = s->userdata);
585
586     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
587         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
588         return;
589
590     pa_source_output_get_volume(u->source_output, &v, TRUE);
591
592     if (pa_cvolume_equal(&s->real_volume, &v))
593         /* no change */
594         return;
595
596     s->real_volume = v;
597     pa_source_set_soft_volume(s, NULL);
598 }
599
600 /* Called from main context */
601 static void source_set_mute_cb(pa_source *s) {
602     struct userdata *u;
603
604     pa_source_assert_ref(s);
605     pa_assert_se(u = s->userdata);
606
607     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
608         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
609         return;
610
611     pa_source_output_set_mute(u->source_output, s->muted, s->save_muted);
612 }
613
614 /* Called from main context */
615 static void sink_set_mute_cb(pa_sink *s) {
616     struct userdata *u;
617
618     pa_sink_assert_ref(s);
619     pa_assert_se(u = s->userdata);
620
621     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
622         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
623         return;
624
625     pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
626 }
627
628 /* Called from main context */
629 static void source_get_mute_cb(pa_source *s) {
630     struct userdata *u;
631
632     pa_source_assert_ref(s);
633     pa_assert_se(u = s->userdata);
634
635     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
636         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
637         return;
638
639     pa_source_output_get_mute(u->source_output);
640 }
641
642 /* must be called from the input thread context */
643 static void apply_diff_time(struct userdata *u, int64_t diff_time) {
644     int64_t diff;
645
646     if (diff_time < 0) {
647         diff = pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec);
648
649         if (diff > 0) {
650             /* add some extra safety samples to compensate for jitter in the
651              * timings */
652             diff += 10 * pa_frame_size (&u->source_output->sample_spec);
653
654             pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
655
656             u->sink_skip = diff;
657             u->source_skip = 0;
658         }
659     } else if (diff_time > 0) {
660         diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
661
662         if (diff > 0) {
663             pa_log("playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
664
665             u->source_skip = diff;
666             u->sink_skip = 0;
667         }
668     }
669 }
670
671 /* must be called from the input thread */
672 static void do_resync(struct userdata *u) {
673     int64_t diff_time;
674     struct snapshot latency_snapshot;
675
676     pa_log("Doing resync");
677
678     /* update our snapshot */
679     source_output_snapshot_within_thread(u, &latency_snapshot);
680     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
681
682     /* calculate drift between capture and playback */
683     diff_time = calc_diff(u, &latency_snapshot);
684
685     /* and adjust for the drift */
686     apply_diff_time(u, diff_time);
687 }
688
689 /* 1. Calculate drift at this point, pass to canceller
690  * 2. Push out playback samples in blocksize chunks
691  * 3. Push out capture samples in blocksize chunks
692  * 4. ???
693  * 5. Profit
694  */
695 static void do_push_drift_comp(struct userdata *u) {
696     size_t rlen, plen;
697     pa_memchunk rchunk, pchunk, cchunk;
698     uint8_t *rdata, *pdata, *cdata;
699     float drift;
700     int unused PA_GCC_UNUSED;
701
702     rlen = pa_memblockq_get_length(u->source_memblockq);
703     plen = pa_memblockq_get_length(u->sink_memblockq);
704
705     /* Estimate snapshot drift as follows:
706      *   pd: amount of data consumed since last time
707      *   rd: amount of data consumed since last time
708      *
709      *   drift = (pd - rd) / rd;
710      *
711      * We calculate pd and rd as the memblockq length less the number of
712      * samples left from the last iteration (to avoid double counting
713      * those remainder samples.
714      */
715     drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
716     u->sink_rem = plen % u->blocksize;
717     u->source_rem = rlen % u->blocksize;
718
719     /* Now let the canceller work its drift compensation magic */
720     u->ec->set_drift(u->ec, drift);
721
722     if (u->save_aec) {
723         if (u->drift_file)
724             fprintf(u->drift_file, "d %a\n", drift);
725     }
726
727     /* Send in the playback samples first */
728     while (plen >= u->blocksize) {
729         pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
730         pdata = pa_memblock_acquire(pchunk.memblock);
731         pdata += pchunk.index;
732
733         u->ec->play(u->ec, pdata);
734
735         if (u->save_aec) {
736             if (u->drift_file)
737                 fprintf(u->drift_file, "p %d\n", u->blocksize);
738             if (u->played_file)
739                 unused = fwrite(pdata, 1, u->blocksize, u->played_file);
740         }
741
742         pa_memblock_release(pchunk.memblock);
743         pa_memblockq_drop(u->sink_memblockq, u->blocksize);
744         pa_memblock_unref(pchunk.memblock);
745
746         plen -= u->blocksize;
747     }
748
749     /* And now the capture samples */
750     while (rlen >= u->blocksize) {
751         pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
752
753         rdata = pa_memblock_acquire(rchunk.memblock);
754         rdata += rchunk.index;
755
756         cchunk.index = 0;
757         cchunk.length = u->blocksize;
758         cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
759         cdata = pa_memblock_acquire(cchunk.memblock);
760
761         u->ec->record(u->ec, rdata, cdata);
762
763         if (u->save_aec) {
764             if (u->drift_file)
765                 fprintf(u->drift_file, "c %d\n", u->blocksize);
766             if (u->captured_file)
767                 unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
768             if (u->canceled_file)
769                 unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
770         }
771
772         pa_memblock_release(cchunk.memblock);
773         pa_memblock_release(rchunk.memblock);
774
775         pa_memblock_unref(rchunk.memblock);
776
777         pa_source_post(u->source, &cchunk);
778         pa_memblock_unref(cchunk.memblock);
779
780         pa_memblockq_drop(u->source_memblockq, u->blocksize);
781         rlen -= u->blocksize;
782     }
783 }
784
785 /* This one's simpler than the drift compensation case -- we just iterate over
786  * the capture buffer, and pass the canceller blocksize bytes of playback and
787  * capture data. */
788 static void do_push(struct userdata *u) {
789     size_t rlen, plen;
790     pa_memchunk rchunk, pchunk, cchunk;
791     uint8_t *rdata, *pdata, *cdata;
792     int unused PA_GCC_UNUSED;
793
794     rlen = pa_memblockq_get_length(u->source_memblockq);
795     plen = pa_memblockq_get_length(u->sink_memblockq);
796
797     while (rlen >= u->blocksize) {
798         /* take fixed block from recorded samples */
799         pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
800
801         if (plen > u->blocksize) {
802             if (plen > u->blocksize) {
803                 /* take fixed block from played samples */
804                 pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
805
806                 rdata = pa_memblock_acquire(rchunk.memblock);
807                 rdata += rchunk.index;
808                 pdata = pa_memblock_acquire(pchunk.memblock);
809                 pdata += pchunk.index;
810
811                 cchunk.index = 0;
812                 cchunk.length = u->blocksize;
813                 cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
814                 cdata = pa_memblock_acquire(cchunk.memblock);
815
816                 if (u->save_aec) {
817                     if (u->captured_file)
818                         unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
819                     if (u->played_file)
820                         unused = fwrite(pdata, 1, u->blocksize, u->played_file);
821                 }
822
823                 /* perform echo cancellation */
824                 u->ec->run(u->ec, rdata, pdata, cdata);
825
826                 if (u->save_aec) {
827                     if (u->canceled_file)
828                         unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
829                 }
830
831                 pa_memblock_release(cchunk.memblock);
832                 pa_memblock_release(pchunk.memblock);
833                 pa_memblock_release(rchunk.memblock);
834
835                 /* drop consumed sink samples */
836                 pa_memblockq_drop(u->sink_memblockq, u->blocksize);
837                 pa_memblock_unref(pchunk.memblock);
838
839                 pa_memblock_unref(rchunk.memblock);
840                 /* the filtered samples now become the samples from our
841                  * source */
842                 rchunk = cchunk;
843
844                 plen -= u->blocksize;
845             }
846         }
847
848         /* forward the (echo-canceled) data to the virtual source */
849         pa_source_post(u->source, &rchunk);
850         pa_memblock_unref(rchunk.memblock);
851
852         pa_memblockq_drop(u->source_memblockq, u->blocksize);
853         rlen -= u->blocksize;
854     }
855 }
856
857 /* Called from input thread context */
858 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
859     struct userdata *u;
860     size_t rlen, plen, to_skip;
861     pa_memchunk rchunk;
862
863     pa_source_output_assert_ref(o);
864     pa_source_output_assert_io_context(o);
865     pa_assert_se(u = o->userdata);
866
867     if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
868         pa_log("push when no link?");
869         return;
870     }
871
872     if (PA_UNLIKELY(u->source->thread_info.state != PA_SOURCE_RUNNING ||
873                     u->sink->thread_info.state != PA_SINK_RUNNING)) {
874         pa_source_post(u->source, chunk);
875         return;
876     }
877
878     /* handle queued messages, do any message sending of our own */
879     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
880         ;
881
882     pa_memblockq_push_align(u->source_memblockq, chunk);
883
884     rlen = pa_memblockq_get_length(u->source_memblockq);
885     plen = pa_memblockq_get_length(u->sink_memblockq);
886
887     /* Let's not do anything else till we have enough data to process */
888     if (rlen < u->blocksize)
889         return;
890
891     /* See if we need to drop samples in order to sync */
892     if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
893         do_resync(u);
894     }
895
896     /* Okay, skip cancellation for skipped source samples if needed. */
897     if (PA_UNLIKELY(u->source_skip)) {
898         /* The slightly tricky bit here is that we drop all but modulo
899          * blocksize bytes and then adjust for that last bit on the sink side.
900          * We do this because the source data is coming at a fixed rate, which
901          * means the only way to try to catch up is drop sink samples and let
902          * the canceller cope up with this. */
903         to_skip = rlen >= u->source_skip ? u->source_skip : rlen;
904         to_skip -= to_skip % u->blocksize;
905
906         if (to_skip) {
907             pa_memblockq_peek_fixed_size(u->source_memblockq, to_skip, &rchunk);
908             pa_source_post(u->source, &rchunk);
909
910             pa_memblock_unref(rchunk.memblock);
911             pa_memblockq_drop(u->source_memblockq, u->blocksize);
912
913             rlen -= to_skip;
914             u->source_skip -= to_skip;
915         }
916
917         if (rlen && u->source_skip % u->blocksize) {
918             u->sink_skip += u->blocksize - (u->source_skip % u->blocksize);
919             u->source_skip -= (u->source_skip % u->blocksize);
920         }
921     }
922
923     /* And for the sink, these samples have been played back already, so we can
924      * just drop them and get on with it. */
925     if (PA_UNLIKELY(u->sink_skip)) {
926         to_skip = plen >= u->sink_skip ? u->sink_skip : plen;
927
928         pa_memblockq_drop(u->sink_memblockq, to_skip);
929
930         plen -= to_skip;
931         u->sink_skip -= to_skip;
932     }
933
934     /* process and push out samples */
935     if (u->ec->params.drift_compensation)
936         do_push_drift_comp(u);
937     else
938         do_push(u);
939 }
940
941 /* Called from I/O thread context */
942 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
943     struct userdata *u;
944
945     pa_sink_input_assert_ref(i);
946     pa_assert(chunk);
947     pa_assert_se(u = i->userdata);
948
949     if (u->sink->thread_info.rewind_requested)
950         pa_sink_process_rewind(u->sink, 0);
951
952     pa_sink_render_full(u->sink, nbytes, chunk);
953
954     if (i->thread_info.underrun_for > 0) {
955         pa_log_debug("Handling end of underrun.");
956         pa_atomic_store(&u->request_resync, 1);
957     }
958
959     /* let source thread handle the chunk. pass the sample count as well so that
960      * the source IO thread can update the right variables. */
961     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
962         NULL, 0, chunk, NULL);
963     u->send_counter += chunk->length;
964
965     return 0;
966 }
967
968 /* Called from input thread context */
969 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
970     struct userdata *u;
971
972     pa_source_output_assert_ref(o);
973     pa_source_output_assert_io_context(o);
974     pa_assert_se(u = o->userdata);
975
976     pa_source_process_rewind(u->source, nbytes);
977
978     /* go back on read side, we need to use older sink data for this */
979     pa_memblockq_rewind(u->sink_memblockq, nbytes);
980
981     /* manipulate write index */
982     pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, TRUE);
983
984     pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
985         (long long) pa_memblockq_get_length (u->source_memblockq));
986 }
987
988 /* Called from I/O thread context */
989 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
990     struct userdata *u;
991
992     pa_sink_input_assert_ref(i);
993     pa_assert_se(u = i->userdata);
994
995     pa_log_debug("Sink process rewind %lld", (long long) nbytes);
996
997     pa_sink_process_rewind(u->sink, nbytes);
998
999     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
1000     u->send_counter -= nbytes;
1001 }
1002
1003 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
1004     size_t delay, rlen, plen;
1005     pa_usec_t now, latency;
1006
1007     now = pa_rtclock_now();
1008     latency = pa_source_get_latency_within_thread(u->source_output->source);
1009     delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
1010
1011     delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
1012     rlen = pa_memblockq_get_length(u->source_memblockq);
1013     plen = pa_memblockq_get_length(u->sink_memblockq);
1014
1015     snapshot->source_now = now;
1016     snapshot->source_latency = latency;
1017     snapshot->source_delay = delay;
1018     snapshot->recv_counter = u->recv_counter;
1019     snapshot->rlen = rlen + u->sink_skip;
1020     snapshot->plen = plen + u->source_skip;
1021 }
1022
1023
1024 /* Called from output thread context */
1025 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1026     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
1027
1028     switch (code) {
1029
1030         case SOURCE_OUTPUT_MESSAGE_POST:
1031
1032             pa_source_output_assert_io_context(u->source_output);
1033
1034             if (u->source_output->source->thread_info.state == PA_SOURCE_RUNNING)
1035                 pa_memblockq_push_align(u->sink_memblockq, chunk);
1036             else
1037                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1038
1039             u->recv_counter += (int64_t) chunk->length;
1040
1041             return 0;
1042
1043         case SOURCE_OUTPUT_MESSAGE_REWIND:
1044             pa_source_output_assert_io_context(u->source_output);
1045
1046             /* manipulate write index, never go past what we have */
1047             if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
1048                 pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, TRUE);
1049             else
1050                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1051
1052             pa_log_debug("Sink rewind (%lld)", (long long) offset);
1053
1054             u->recv_counter -= offset;
1055
1056             return 0;
1057
1058         case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
1059             struct snapshot *snapshot = (struct snapshot *) data;
1060
1061             source_output_snapshot_within_thread(u, snapshot);
1062             return 0;
1063         }
1064
1065         case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
1066             apply_diff_time(u, offset);
1067             return 0;
1068
1069     }
1070
1071     return pa_source_output_process_msg(obj, code, data, offset, chunk);
1072 }
1073
1074 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1075     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1076
1077     switch (code) {
1078
1079         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1080             size_t delay;
1081             pa_usec_t now, latency;
1082             struct snapshot *snapshot = (struct snapshot *) data;
1083
1084             pa_sink_input_assert_io_context(u->sink_input);
1085
1086             now = pa_rtclock_now();
1087             latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
1088             delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1089
1090             delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
1091
1092             snapshot->sink_now = now;
1093             snapshot->sink_latency = latency;
1094             snapshot->sink_delay = delay;
1095             snapshot->send_counter = u->send_counter;
1096             return 0;
1097         }
1098     }
1099
1100     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1101 }
1102
1103 /* Called from I/O thread context */
1104 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1105     struct userdata *u;
1106
1107     pa_sink_input_assert_ref(i);
1108     pa_assert_se(u = i->userdata);
1109
1110     pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
1111
1112     pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
1113     pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
1114 }
1115
1116 /* Called from I/O thread context */
1117 static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
1118     struct userdata *u;
1119
1120     pa_source_output_assert_ref(o);
1121     pa_assert_se(u = o->userdata);
1122
1123     pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
1124
1125     pa_source_set_max_rewind_within_thread(u->source, nbytes);
1126 }
1127
1128 /* Called from I/O thread context */
1129 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1130     struct userdata *u;
1131
1132     pa_sink_input_assert_ref(i);
1133     pa_assert_se(u = i->userdata);
1134
1135     pa_log_debug("Sink input update max request %lld", (long long) nbytes);
1136
1137     pa_sink_set_max_request_within_thread(u->sink, nbytes);
1138 }
1139
1140 /* Called from I/O thread context */
1141 static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
1142     struct userdata *u;
1143     pa_usec_t latency;
1144
1145     pa_sink_input_assert_ref(i);
1146     pa_assert_se(u = i->userdata);
1147
1148     latency = pa_sink_get_requested_latency_within_thread(i->sink);
1149
1150     pa_log_debug("Sink input update requested latency %lld", (long long) latency);
1151 }
1152
1153 /* Called from I/O thread context */
1154 static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
1155     struct userdata *u;
1156     pa_usec_t latency;
1157
1158     pa_source_output_assert_ref(o);
1159     pa_assert_se(u = o->userdata);
1160
1161     latency = pa_source_get_requested_latency_within_thread(o->source);
1162
1163     pa_log_debug("source output update requested latency %lld", (long long) latency);
1164 }
1165
1166 /* Called from I/O thread context */
1167 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
1168     struct userdata *u;
1169
1170     pa_sink_input_assert_ref(i);
1171     pa_assert_se(u = i->userdata);
1172
1173     pa_log_debug("Sink input update latency range %lld %lld",
1174         (long long) i->sink->thread_info.min_latency,
1175         (long long) i->sink->thread_info.max_latency);
1176
1177     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1178 }
1179
1180 /* Called from I/O thread context */
1181 static void source_output_update_source_latency_range_cb(pa_source_output *o) {
1182     struct userdata *u;
1183
1184     pa_source_output_assert_ref(o);
1185     pa_assert_se(u = o->userdata);
1186
1187     pa_log_debug("Source output update latency range %lld %lld",
1188         (long long) o->source->thread_info.min_latency,
1189         (long long) o->source->thread_info.max_latency);
1190
1191     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1192 }
1193
1194 /* Called from I/O thread context */
1195 static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1196     struct userdata *u;
1197
1198     pa_sink_input_assert_ref(i);
1199     pa_assert_se(u = i->userdata);
1200
1201     pa_log_debug("Sink input update fixed latency %lld",
1202         (long long) i->sink->thread_info.fixed_latency);
1203
1204     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1205 }
1206
1207 /* Called from I/O thread context */
1208 static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1209     struct userdata *u;
1210
1211     pa_source_output_assert_ref(o);
1212     pa_assert_se(u = o->userdata);
1213
1214     pa_log_debug("Source output update fixed latency %lld",
1215         (long long) o->source->thread_info.fixed_latency);
1216
1217     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1218 }
1219
1220 /* Called from output thread context */
1221 static void source_output_attach_cb(pa_source_output *o) {
1222     struct userdata *u;
1223
1224     pa_source_output_assert_ref(o);
1225     pa_source_output_assert_io_context(o);
1226     pa_assert_se(u = o->userdata);
1227
1228     pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1229     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1230     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1231     pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1232
1233     pa_log_debug("Source output %d attach", o->index);
1234
1235     pa_source_attach_within_thread(u->source);
1236
1237     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1238             o->source->thread_info.rtpoll,
1239             PA_RTPOLL_LATE,
1240             u->asyncmsgq);
1241 }
1242
1243 /* Called from I/O thread context */
1244 static void sink_input_attach_cb(pa_sink_input *i) {
1245     struct userdata *u;
1246
1247     pa_sink_input_assert_ref(i);
1248     pa_assert_se(u = i->userdata);
1249
1250     pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1251     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1252
1253     /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1254      * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1255     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1256
1257     /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1258      * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1259      * HERE. SEE (6) */
1260     pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1261     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1262
1263     pa_log_debug("Sink input %d attach", i->index);
1264
1265     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1266             i->sink->thread_info.rtpoll,
1267             PA_RTPOLL_LATE,
1268             u->asyncmsgq);
1269
1270     pa_sink_attach_within_thread(u->sink);
1271 }
1272
1273
1274 /* Called from output thread context */
1275 static void source_output_detach_cb(pa_source_output *o) {
1276     struct userdata *u;
1277
1278     pa_source_output_assert_ref(o);
1279     pa_source_output_assert_io_context(o);
1280     pa_assert_se(u = o->userdata);
1281
1282     pa_source_detach_within_thread(u->source);
1283     pa_source_set_rtpoll(u->source, NULL);
1284
1285     pa_log_debug("Source output %d detach", o->index);
1286
1287     if (u->rtpoll_item_read) {
1288         pa_rtpoll_item_free(u->rtpoll_item_read);
1289         u->rtpoll_item_read = NULL;
1290     }
1291 }
1292
1293 /* Called from I/O thread context */
1294 static void sink_input_detach_cb(pa_sink_input *i) {
1295     struct userdata *u;
1296
1297     pa_sink_input_assert_ref(i);
1298     pa_assert_se(u = i->userdata);
1299
1300     pa_sink_detach_within_thread(u->sink);
1301
1302     pa_sink_set_rtpoll(u->sink, NULL);
1303
1304     pa_log_debug("Sink input %d detach", i->index);
1305
1306     if (u->rtpoll_item_write) {
1307         pa_rtpoll_item_free(u->rtpoll_item_write);
1308         u->rtpoll_item_write = NULL;
1309     }
1310 }
1311
1312 /* Called from output thread context */
1313 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1314     struct userdata *u;
1315
1316     pa_source_output_assert_ref(o);
1317     pa_source_output_assert_io_context(o);
1318     pa_assert_se(u = o->userdata);
1319
1320     pa_log_debug("Source output %d state %d", o->index, state);
1321 }
1322
1323 /* Called from IO thread context */
1324 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1325     struct userdata *u;
1326
1327     pa_sink_input_assert_ref(i);
1328     pa_assert_se(u = i->userdata);
1329
1330     pa_log_debug("Sink input %d state %d", i->index, state);
1331
1332     /* If we are added for the first time, ask for a rewinding so that
1333      * we are heard right-away. */
1334     if (PA_SINK_INPUT_IS_LINKED(state) &&
1335         i->thread_info.state == PA_SINK_INPUT_INIT) {
1336         pa_log_debug("Requesting rewind due to state change.");
1337         pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
1338     }
1339 }
1340
1341 /* Called from main thread */
1342 static void source_output_kill_cb(pa_source_output *o) {
1343     struct userdata *u;
1344
1345     pa_source_output_assert_ref(o);
1346     pa_assert_ctl_context();
1347     pa_assert_se(u = o->userdata);
1348
1349     u->dead = TRUE;
1350
1351     /* The order here matters! We first kill the source output, followed
1352      * by the source. That means the source callbacks must be protected
1353      * against an unconnected source output! */
1354     pa_source_output_unlink(u->source_output);
1355     pa_source_unlink(u->source);
1356
1357     pa_source_output_unref(u->source_output);
1358     u->source_output = NULL;
1359
1360     pa_source_unref(u->source);
1361     u->source = NULL;
1362
1363     pa_log_debug("Source output kill %d", o->index);
1364
1365     pa_module_unload_request(u->module, TRUE);
1366 }
1367
1368 /* Called from main context */
1369 static void sink_input_kill_cb(pa_sink_input *i) {
1370     struct userdata *u;
1371
1372     pa_sink_input_assert_ref(i);
1373     pa_assert_se(u = i->userdata);
1374
1375     u->dead = TRUE;
1376
1377     /* The order here matters! We first kill the sink input, followed
1378      * by the sink. That means the sink callbacks must be protected
1379      * against an unconnected sink input! */
1380     pa_sink_input_unlink(u->sink_input);
1381     pa_sink_unlink(u->sink);
1382
1383     pa_sink_input_unref(u->sink_input);
1384     u->sink_input = NULL;
1385
1386     pa_sink_unref(u->sink);
1387     u->sink = NULL;
1388
1389     pa_log_debug("Sink input kill %d", i->index);
1390
1391     pa_module_unload_request(u->module, TRUE);
1392 }
1393
1394 /* Called from main thread */
1395 static pa_bool_t source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1396     struct userdata *u;
1397
1398     pa_source_output_assert_ref(o);
1399     pa_assert_ctl_context();
1400     pa_assert_se(u = o->userdata);
1401
1402     if (u->dead)
1403         return FALSE;
1404
1405     return (u->source != dest) && (u->sink != dest->monitor_of);
1406 }
1407
1408 /* Called from main context */
1409 static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1410     struct userdata *u;
1411
1412     pa_sink_input_assert_ref(i);
1413     pa_assert_se(u = i->userdata);
1414
1415     if (u->dead)
1416         return FALSE;
1417
1418     return u->sink != dest;
1419 }
1420
1421 /* Called from main thread */
1422 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1423     struct userdata *u;
1424
1425     pa_source_output_assert_ref(o);
1426     pa_assert_ctl_context();
1427     pa_assert_se(u = o->userdata);
1428
1429     if (dest) {
1430         pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1431         pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1432     } else
1433         pa_source_set_asyncmsgq(u->source, NULL);
1434
1435     if (u->source_auto_desc && dest) {
1436         const char *y, *z;
1437         pa_proplist *pl;
1438
1439         pl = pa_proplist_new();
1440         y = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION);
1441         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1442         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1443                 y ? y : u->sink_input->sink->name);
1444
1445         pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1446         pa_proplist_free(pl);
1447     }
1448 }
1449
1450 /* Called from main context */
1451 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1452     struct userdata *u;
1453
1454     pa_sink_input_assert_ref(i);
1455     pa_assert_se(u = i->userdata);
1456
1457     if (dest) {
1458         pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1459         pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1460     } else
1461         pa_sink_set_asyncmsgq(u->sink, NULL);
1462
1463     if (u->sink_auto_desc && dest) {
1464         const char *y, *z;
1465         pa_proplist *pl;
1466
1467         pl = pa_proplist_new();
1468         y = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION);
1469         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1470         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1471                          y ? y : u->source_output->source->name);
1472
1473         pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1474         pa_proplist_free(pl);
1475     }
1476 }
1477
1478 /* Called from main context */
1479 static void sink_input_volume_changed_cb(pa_sink_input *i) {
1480     struct userdata *u;
1481
1482     pa_sink_input_assert_ref(i);
1483     pa_assert_se(u = i->userdata);
1484
1485     pa_sink_volume_changed(u->sink, &i->volume);
1486 }
1487
1488 /* Called from main context */
1489 static void sink_input_mute_changed_cb(pa_sink_input *i) {
1490     struct userdata *u;
1491
1492     pa_sink_input_assert_ref(i);
1493     pa_assert_se(u = i->userdata);
1494
1495     pa_sink_mute_changed(u->sink, i->muted);
1496 }
1497
1498 /* Called from main context */
1499 static int canceller_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1500     struct pa_echo_canceller_msg *msg;
1501     struct userdata *u;
1502
1503     pa_assert(o);
1504
1505     msg = PA_ECHO_CANCELLER_MSG(o);
1506     u = msg->userdata;
1507
1508     switch (code) {
1509         case ECHO_CANCELLER_MESSAGE_SET_VOLUME: {
1510             pa_cvolume *v = (pa_cvolume *) userdata;
1511
1512             if (u->use_volume_sharing)
1513                 pa_source_set_volume(u->source, v, TRUE, FALSE);
1514             else
1515                 pa_source_output_set_volume(u->source_output, v, FALSE, TRUE);
1516
1517             break;
1518         }
1519
1520         default:
1521             pa_assert_not_reached();
1522             break;
1523     }
1524
1525     return 0;
1526 }
1527
1528 /* Called by the canceller, so thread context */
1529 void pa_echo_canceller_get_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1530     *v = ec->msg->userdata->thread_info.current_volume;
1531 }
1532
1533 /* Called by the canceller, so thread context */
1534 void pa_echo_canceller_set_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1535     if (!pa_cvolume_equal(&ec->msg->userdata->thread_info.current_volume, v)) {
1536         pa_cvolume *vol = pa_xnewdup(pa_cvolume, v, 1);
1537
1538         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(ec->msg), ECHO_CANCELLER_MESSAGE_SET_VOLUME, vol, 0, NULL,
1539                 pa_xfree);
1540     }
1541 }
1542
1543 static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1544     if (pa_streq(method, "speex"))
1545         return PA_ECHO_CANCELLER_SPEEX;
1546     else if (pa_streq(method, "adrian"))
1547         return PA_ECHO_CANCELLER_ADRIAN;
1548 #ifdef HAVE_WEBRTC
1549     else if (pa_streq(method, "webrtc"))
1550         return PA_ECHO_CANCELLER_WEBRTC;
1551 #endif
1552     else
1553         return PA_ECHO_CANCELLER_INVALID;
1554 }
1555
1556 /* Common initialisation bits between module-echo-cancel and the standalone test program */
1557 static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *source_ss, pa_channel_map *source_map) {
1558     pa_echo_canceller_method_t ec_method;
1559
1560     if (pa_modargs_get_sample_spec_and_channel_map(ma, source_ss, source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1561         pa_log("Invalid sample format specification or channel map");
1562         goto fail;
1563     }
1564
1565     u->ec = pa_xnew0(pa_echo_canceller, 1);
1566     if (!u->ec) {
1567         pa_log("Failed to alloc echo canceller");
1568         goto fail;
1569     }
1570
1571     if ((ec_method = get_ec_method_from_string(pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER))) < 0) {
1572         pa_log("Invalid echo canceller implementation");
1573         goto fail;
1574     }
1575
1576     u->ec->init = ec_table[ec_method].init;
1577     u->ec->play = ec_table[ec_method].play;
1578     u->ec->record = ec_table[ec_method].record;
1579     u->ec->set_drift = ec_table[ec_method].set_drift;
1580     u->ec->run = ec_table[ec_method].run;
1581     u->ec->done = ec_table[ec_method].done;
1582
1583     return 0;
1584
1585 fail:
1586     return -1;
1587 }
1588
1589
1590 int pa__init(pa_module*m) {
1591     struct userdata *u;
1592     pa_sample_spec source_ss, sink_ss;
1593     pa_channel_map source_map, sink_map;
1594     pa_modargs *ma;
1595     pa_source *source_master=NULL;
1596     pa_sink *sink_master=NULL;
1597     pa_source_output_new_data source_output_data;
1598     pa_sink_input_new_data sink_input_data;
1599     pa_source_new_data source_data;
1600     pa_sink_new_data sink_data;
1601     pa_memchunk silence;
1602     uint32_t temp;
1603
1604     pa_assert(m);
1605
1606     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1607         pa_log("Failed to parse module arguments.");
1608         goto fail;
1609     }
1610
1611     if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1612         pa_log("Master source not found");
1613         goto fail;
1614     }
1615     pa_assert(source_master);
1616
1617     if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1618         pa_log("Master sink not found");
1619         goto fail;
1620     }
1621     pa_assert(sink_master);
1622
1623     if (source_master->monitor_of == sink_master) {
1624         pa_log("Can't cancel echo between a sink and its monitor");
1625         goto fail;
1626     }
1627
1628     source_ss = source_master->sample_spec;
1629     source_ss.rate = DEFAULT_RATE;
1630     source_ss.channels = DEFAULT_CHANNELS;
1631     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1632
1633     sink_ss = sink_master->sample_spec;
1634     sink_map = sink_master->channel_map;
1635
1636     u = pa_xnew0(struct userdata, 1);
1637     if (!u) {
1638         pa_log("Failed to alloc userdata");
1639         goto fail;
1640     }
1641     u->core = m->core;
1642     u->module = m;
1643     m->userdata = u;
1644     u->dead = FALSE;
1645
1646     u->use_volume_sharing = TRUE;
1647     if (pa_modargs_get_value_boolean(ma, "use_volume_sharing", &u->use_volume_sharing) < 0) {
1648         pa_log("use_volume_sharing= expects a boolean argument");
1649         goto fail;
1650     }
1651
1652     temp = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1653     if (pa_modargs_get_value_u32(ma, "adjust_time", &temp) < 0) {
1654         pa_log("Failed to parse adjust_time value");
1655         goto fail;
1656     }
1657
1658     if (temp != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1659         u->adjust_time = temp * PA_USEC_PER_SEC;
1660     else
1661         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1662
1663     temp = DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC;
1664     if (pa_modargs_get_value_u32(ma, "adjust_threshold", &temp) < 0) {
1665         pa_log("Failed to parse adjust_threshold value");
1666         goto fail;
1667     }
1668
1669     if (temp != DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC)
1670         u->adjust_threshold = temp * PA_USEC_PER_MSEC;
1671     else
1672         u->adjust_threshold = DEFAULT_ADJUST_TOLERANCE;
1673
1674     u->save_aec = DEFAULT_SAVE_AEC;
1675     if (pa_modargs_get_value_boolean(ma, "save_aec", &u->save_aec) < 0) {
1676         pa_log("Failed to parse save_aec value");
1677         goto fail;
1678     }
1679
1680     u->autoloaded = DEFAULT_AUTOLOADED;
1681     if (pa_modargs_get_value_boolean(ma, "autoloaded", &u->autoloaded) < 0) {
1682         pa_log("Failed to parse autoloaded value");
1683         goto fail;
1684     }
1685
1686     if (init_common(ma, u, &source_ss, &source_map))
1687         goto fail;
1688
1689     u->asyncmsgq = pa_asyncmsgq_new(0);
1690     u->need_realign = TRUE;
1691
1692     if (u->ec->init) {
1693         if (!u->ec->init(u->core, u->ec, &source_ss, &source_map, &sink_ss, &sink_map, &u->blocksize, pa_modargs_get_value(ma, "aec_args", NULL))) {
1694             pa_log("Failed to init AEC engine");
1695             goto fail;
1696         }
1697     }
1698
1699     if (u->ec->params.drift_compensation)
1700         pa_assert(u->ec->set_drift);
1701
1702     /* Create source */
1703     pa_source_new_data_init(&source_data);
1704     source_data.driver = __FILE__;
1705     source_data.module = m;
1706     if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1707         source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1708     pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1709     pa_source_new_data_set_channel_map(&source_data, &source_map);
1710     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1711     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1712     if (!u->autoloaded)
1713         pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1714
1715     if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1716         pa_log("Invalid properties");
1717         pa_source_new_data_done(&source_data);
1718         goto fail;
1719     }
1720
1721     if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1722         const char *y, *z;
1723
1724         y = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1725         z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1726         pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1727                 z ? z : source_master->name, y ? y : sink_master->name);
1728     }
1729
1730     u->source = pa_source_new(m->core, &source_data, (source_master->flags & (PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY))
1731                                                      | (u->use_volume_sharing ? PA_SOURCE_SHARE_VOLUME_WITH_MASTER : 0));
1732     pa_source_new_data_done(&source_data);
1733
1734     if (!u->source) {
1735         pa_log("Failed to create source.");
1736         goto fail;
1737     }
1738
1739     u->source->parent.process_msg = source_process_msg_cb;
1740     u->source->set_state = source_set_state_cb;
1741     u->source->update_requested_latency = source_update_requested_latency_cb;
1742     pa_source_set_get_mute_callback(u->source, source_get_mute_cb);
1743     pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
1744     if (!u->use_volume_sharing) {
1745         pa_source_set_get_volume_callback(u->source, source_get_volume_cb);
1746         pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
1747         pa_source_enable_decibel_volume(u->source, TRUE);
1748     }
1749     u->source->userdata = u;
1750
1751     pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1752
1753     /* Create sink */
1754     pa_sink_new_data_init(&sink_data);
1755     sink_data.driver = __FILE__;
1756     sink_data.module = m;
1757     if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1758         sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1759     pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1760     pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1761     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1762     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1763     if (!u->autoloaded)
1764         pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1765
1766     if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1767         pa_log("Invalid properties");
1768         pa_sink_new_data_done(&sink_data);
1769         goto fail;
1770     }
1771
1772     if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1773         const char *y, *z;
1774
1775         y = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1776         z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1777         pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1778                 z ? z : sink_master->name, y ? y : source_master->name);
1779     }
1780
1781     u->sink = pa_sink_new(m->core, &sink_data, (sink_master->flags & (PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY))
1782                                                | (u->use_volume_sharing ? PA_SINK_SHARE_VOLUME_WITH_MASTER : 0));
1783     pa_sink_new_data_done(&sink_data);
1784
1785     if (!u->sink) {
1786         pa_log("Failed to create sink.");
1787         goto fail;
1788     }
1789
1790     u->sink->parent.process_msg = sink_process_msg_cb;
1791     u->sink->set_state = sink_set_state_cb;
1792     u->sink->update_requested_latency = sink_update_requested_latency_cb;
1793     u->sink->request_rewind = sink_request_rewind_cb;
1794     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
1795     if (!u->use_volume_sharing) {
1796         pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
1797         pa_sink_enable_decibel_volume(u->sink, TRUE);
1798     }
1799     u->sink->userdata = u;
1800
1801     pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1802
1803     /* Create source output */
1804     pa_source_output_new_data_init(&source_output_data);
1805     source_output_data.driver = __FILE__;
1806     source_output_data.module = m;
1807     pa_source_output_new_data_set_source(&source_output_data, source_master, FALSE);
1808     source_output_data.destination_source = u->source;
1809     /* FIXME
1810        source_output_data.flags = PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; */
1811
1812     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1813     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1814     pa_source_output_new_data_set_sample_spec(&source_output_data, &source_ss);
1815     pa_source_output_new_data_set_channel_map(&source_output_data, &source_map);
1816
1817     pa_source_output_new(&u->source_output, m->core, &source_output_data);
1818     pa_source_output_new_data_done(&source_output_data);
1819
1820     if (!u->source_output)
1821         goto fail;
1822
1823     u->source_output->parent.process_msg = source_output_process_msg_cb;
1824     u->source_output->push = source_output_push_cb;
1825     u->source_output->process_rewind = source_output_process_rewind_cb;
1826     u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1827     u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1828     u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1829     u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1830     u->source_output->kill = source_output_kill_cb;
1831     u->source_output->attach = source_output_attach_cb;
1832     u->source_output->detach = source_output_detach_cb;
1833     u->source_output->state_change = source_output_state_change_cb;
1834     u->source_output->may_move_to = source_output_may_move_to_cb;
1835     u->source_output->moving = source_output_moving_cb;
1836     u->source_output->userdata = u;
1837
1838     u->source->output_from_master = u->source_output;
1839
1840     /* Create sink input */
1841     pa_sink_input_new_data_init(&sink_input_data);
1842     sink_input_data.driver = __FILE__;
1843     sink_input_data.module = m;
1844     pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, FALSE);
1845     sink_input_data.origin_sink = u->sink;
1846     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
1847     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1848     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
1849     pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
1850     sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
1851
1852     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1853     pa_sink_input_new_data_done(&sink_input_data);
1854
1855     if (!u->sink_input)
1856         goto fail;
1857
1858     u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1859     u->sink_input->pop = sink_input_pop_cb;
1860     u->sink_input->process_rewind = sink_input_process_rewind_cb;
1861     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1862     u->sink_input->update_max_request = sink_input_update_max_request_cb;
1863     u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
1864     u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1865     u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
1866     u->sink_input->kill = sink_input_kill_cb;
1867     u->sink_input->attach = sink_input_attach_cb;
1868     u->sink_input->detach = sink_input_detach_cb;
1869     u->sink_input->state_change = sink_input_state_change_cb;
1870     u->sink_input->may_move_to = sink_input_may_move_to_cb;
1871     u->sink_input->moving = sink_input_moving_cb;
1872     if (!u->use_volume_sharing)
1873         u->sink_input->volume_changed = sink_input_volume_changed_cb;
1874     u->sink_input->mute_changed = sink_input_mute_changed_cb;
1875     u->sink_input->userdata = u;
1876
1877     u->sink->input_to_master = u->sink_input;
1878
1879     pa_sink_input_get_silence(u->sink_input, &silence);
1880
1881     u->source_memblockq = pa_memblockq_new("module-echo-cancel source_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1882         &source_ss, 1, 1, 0, &silence);
1883     u->sink_memblockq = pa_memblockq_new("module-echo-cancel sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1884         &sink_ss, 1, 1, 0, &silence);
1885
1886     pa_memblock_unref(silence.memblock);
1887
1888     if (!u->source_memblockq || !u->sink_memblockq) {
1889         pa_log("Failed to create memblockq.");
1890         goto fail;
1891     }
1892
1893     if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
1894         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1895     else if (u->ec->params.drift_compensation) {
1896         pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
1897         u->adjust_time = 0;
1898         /* Perform resync just once to give the canceller a leg up */
1899         pa_atomic_store(&u->request_resync, 1);
1900     }
1901
1902     if (u->save_aec) {
1903         pa_log("Creating AEC files in /tmp");
1904         u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
1905         if (u->captured_file == NULL)
1906             perror ("fopen failed");
1907         u->played_file = fopen("/tmp/aec_play.sw", "wb");
1908         if (u->played_file == NULL)
1909             perror ("fopen failed");
1910         u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
1911         if (u->canceled_file == NULL)
1912             perror ("fopen failed");
1913         if (u->ec->params.drift_compensation) {
1914             u->drift_file = fopen("/tmp/aec_drift.txt", "w");
1915             if (u->drift_file == NULL)
1916                 perror ("fopen failed");
1917         }
1918     }
1919
1920     u->ec->msg = pa_msgobject_new(pa_echo_canceller_msg);
1921     u->ec->msg->parent.process_msg = canceller_process_msg_cb;
1922     u->ec->msg->userdata = u;
1923
1924     u->thread_info.current_volume = u->source->reference_volume;
1925
1926     pa_sink_put(u->sink);
1927     pa_source_put(u->source);
1928
1929     pa_sink_input_put(u->sink_input);
1930     pa_source_output_put(u->source_output);
1931     pa_modargs_free(ma);
1932
1933     return 0;
1934
1935 fail:
1936     if (ma)
1937         pa_modargs_free(ma);
1938
1939     pa__done(m);
1940
1941     return -1;
1942 }
1943
1944 int pa__get_n_used(pa_module *m) {
1945     struct userdata *u;
1946
1947     pa_assert(m);
1948     pa_assert_se(u = m->userdata);
1949
1950     return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
1951 }
1952
1953 void pa__done(pa_module*m) {
1954     struct userdata *u;
1955
1956     pa_assert(m);
1957
1958     if (!(u = m->userdata))
1959         return;
1960
1961     u->dead = TRUE;
1962
1963     /* See comments in source_output_kill_cb() above regarding
1964      * destruction order! */
1965
1966     if (u->time_event)
1967         u->core->mainloop->time_free(u->time_event);
1968
1969     if (u->source_output)
1970         pa_source_output_unlink(u->source_output);
1971     if (u->sink_input)
1972         pa_sink_input_unlink(u->sink_input);
1973
1974     if (u->source)
1975         pa_source_unlink(u->source);
1976     if (u->sink)
1977         pa_sink_unlink(u->sink);
1978
1979     if (u->source_output)
1980         pa_source_output_unref(u->source_output);
1981     if (u->sink_input)
1982         pa_sink_input_unref(u->sink_input);
1983
1984     if (u->source)
1985         pa_source_unref(u->source);
1986     if (u->sink)
1987         pa_sink_unref(u->sink);
1988
1989     if (u->source_memblockq)
1990         pa_memblockq_free(u->source_memblockq);
1991     if (u->sink_memblockq)
1992         pa_memblockq_free(u->sink_memblockq);
1993
1994     if (u->ec) {
1995         if (u->ec->done)
1996             u->ec->done(u->ec);
1997
1998         pa_xfree(u->ec);
1999     }
2000
2001     if (u->asyncmsgq)
2002         pa_asyncmsgq_unref(u->asyncmsgq);
2003
2004     if (u->save_aec) {
2005         if (u->played_file)
2006             fclose(u->played_file);
2007         if (u->captured_file)
2008             fclose(u->captured_file);
2009         if (u->canceled_file)
2010             fclose(u->canceled_file);
2011         if (u->drift_file)
2012             fclose(u->drift_file);
2013     }
2014
2015     pa_xfree(u);
2016 }
2017
2018 #ifdef ECHO_CANCEL_TEST
2019 /*
2020  * Stand-alone test program for running in the canceller on pre-recorded files.
2021  */
2022 int main(int argc, char* argv[]) {
2023     struct userdata u;
2024     pa_sample_spec source_ss, sink_ss;
2025     pa_channel_map source_map, sink_map;
2026     pa_modargs *ma = NULL;
2027     uint8_t *rdata = NULL, *pdata = NULL, *cdata = NULL;
2028     int unused PA_GCC_UNUSED;
2029     int ret = 0, i;
2030     char c;
2031     float drift;
2032
2033     pa_memzero(&u, sizeof(u));
2034
2035     if (argc < 4 || argc > 7) {
2036         goto usage;
2037     }
2038
2039     u.ec = pa_xnew0(pa_echo_canceller, 1);
2040     if (!u.ec) {
2041         pa_log("Failed to alloc echo canceller");
2042         goto fail;
2043     }
2044
2045     u.captured_file = fopen(argv[2], "r");
2046     if (u.captured_file == NULL) {
2047         perror ("fopen failed");
2048         goto fail;
2049     }
2050     u.played_file = fopen(argv[1], "r");
2051     if (u.played_file == NULL) {
2052         perror ("fopen failed");
2053         goto fail;
2054     }
2055     u.canceled_file = fopen(argv[3], "wb");
2056     if (u.canceled_file == NULL) {
2057         perror ("fopen failed");
2058         goto fail;
2059     }
2060
2061     u.core = pa_xnew0(pa_core, 1);
2062     u.core->cpu_info.cpu_type = PA_CPU_X86;
2063     u.core->cpu_info.flags.x86 |= PA_CPU_X86_SSE;
2064
2065     if (!(ma = pa_modargs_new(argc > 4 ? argv[4] : NULL, valid_modargs))) {
2066         pa_log("Failed to parse module arguments.");
2067         goto fail;
2068     }
2069
2070     source_ss.format = PA_SAMPLE_S16LE;
2071     source_ss.rate = DEFAULT_RATE;
2072     source_ss.channels = DEFAULT_CHANNELS;
2073     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2074
2075     init_common(ma, &u, &source_ss, &source_map);
2076
2077     if (!u.ec->init(u.core, u.ec, &source_ss, &source_map, &sink_ss, &sink_map, &u.blocksize,
2078                      (argc > 4) ? argv[5] : NULL )) {
2079         pa_log("Failed to init AEC engine");
2080         goto fail;
2081     }
2082
2083     if (u.ec->params.drift_compensation) {
2084         if (argc < 7) {
2085             pa_log("Drift compensation enabled but drift file not specified");
2086             goto fail;
2087         }
2088
2089         u.drift_file = fopen(argv[6], "r");
2090
2091         if (u.drift_file == NULL) {
2092             perror ("fopen failed");
2093             goto fail;
2094         }
2095     }
2096
2097     rdata = pa_xmalloc(u.blocksize);
2098     pdata = pa_xmalloc(u.blocksize);
2099     cdata = pa_xmalloc(u.blocksize);
2100
2101     if (!u.ec->params.drift_compensation) {
2102         while (fread(rdata, u.blocksize, 1, u.captured_file) > 0) {
2103             if (fread(pdata, u.blocksize, 1, u.played_file) == 0) {
2104                 perror("Played file ended before captured file");
2105                 goto fail;
2106             }
2107
2108             u.ec->run(u.ec, rdata, pdata, cdata);
2109
2110             unused = fwrite(cdata, u.blocksize, 1, u.canceled_file);
2111         }
2112     } else {
2113         while (fscanf(u.drift_file, "%c", &c) > 0) {
2114             switch (c) {
2115                 case 'd':
2116                     if (!fscanf(u.drift_file, "%a", &drift)) {
2117                         perror("Drift file incomplete");
2118                         goto fail;
2119                     }
2120
2121                     u.ec->set_drift(u.ec, drift);
2122
2123                     break;
2124
2125                 case 'c':
2126                     if (!fscanf(u.drift_file, "%d", &i)) {
2127                         perror("Drift file incomplete");
2128                         goto fail;
2129                     }
2130
2131                     if (fread(rdata, i, 1, u.captured_file) <= 0) {
2132                         perror("Captured file ended prematurely");
2133                         goto fail;
2134                     }
2135
2136                     u.ec->record(u.ec, rdata, cdata);
2137
2138                     unused = fwrite(cdata, i, 1, u.canceled_file);
2139
2140                     break;
2141
2142                 case 'p':
2143                     if (!fscanf(u.drift_file, "%d", &i)) {
2144                         perror("Drift file incomplete");
2145                         goto fail;
2146                     }
2147
2148                     if (fread(pdata, i, 1, u.played_file) <= 0) {
2149                         perror("Played file ended prematurely");
2150                         goto fail;
2151                     }
2152
2153                     u.ec->play(u.ec, pdata);
2154
2155                     break;
2156             }
2157         }
2158
2159         if (fread(rdata, i, 1, u.captured_file) > 0)
2160             pa_log("All capture data was not consumed");
2161         if (fread(pdata, i, 1, u.played_file) > 0)
2162             pa_log("All playback data was not consumed");
2163     }
2164
2165     u.ec->done(u.ec);
2166
2167     fclose(u.captured_file);
2168     fclose(u.played_file);
2169     fclose(u.canceled_file);
2170     if (u.drift_file)
2171         fclose(u.drift_file);
2172
2173 out:
2174     pa_xfree(rdata);
2175     pa_xfree(pdata);
2176     pa_xfree(cdata);
2177
2178     pa_xfree(u.ec);
2179     pa_xfree(u.core);
2180
2181     if (ma)
2182         pa_modargs_free(ma);
2183
2184     return ret;
2185
2186 usage:
2187     pa_log("Usage: %s play_file rec_file out_file [module args] [aec_args] [drift_file]", argv[0]);
2188
2189 fail:
2190     ret = -1;
2191     goto out;
2192 }
2193 #endif /* ECHO_CANCEL_TEST */