9f3e85c96ed08360cb20f327e16eb56a18d09e51
[profile/ivi/pulseaudio.git] / src / modules / echo-cancel / module-echo-cancel.c
1 /***
2     This file is part of PulseAudio.
3
4     Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6     Based on module-virtual-sink.c
7              module-virtual-source.c
8              module-loopback.c
9
10         Copyright 2010 Intel Corporation
11         Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13     PulseAudio is free software; you can redistribute it and/or modify
14     it under the terms of the GNU Lesser General Public License as published
15     by the Free Software Foundation; either version 2.1 of the License,
16     or (at your option) any later version.
17
18     PulseAudio is distributed in the hope that it will be useful, but
19     WITHOUT ANY WARRANTY; without even the implied warranty of
20     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21     General Public License for more details.
22
23     You should have received a copy of the GNU Lesser General Public License
24     along with PulseAudio; if not, write to the Free Software
25     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26     USA.
27 ***/
28
29 #ifdef HAVE_CONFIG_H
30 #include <config.h>
31 #endif
32
33 #include <stdio.h>
34 #include <math.h>
35
36 #include "echo-cancel.h"
37
38 #include <pulse/xmalloc.h>
39 #include <pulse/timeval.h>
40 #include <pulse/rtclock.h>
41
42 #include <pulsecore/i18n.h>
43 #include <pulsecore/atomic.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/namereg.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-rtclock.h>
49 #include <pulsecore/core-util.h>
50 #include <pulsecore/modargs.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/rtpoll.h>
53 #include <pulsecore/sample-util.h>
54 #include <pulsecore/ltdl-helper.h>
55
56 #include "module-echo-cancel-symdef.h"
57
58 PA_MODULE_AUTHOR("Wim Taymans");
59 PA_MODULE_DESCRIPTION("Echo Cancellation");
60 PA_MODULE_VERSION(PACKAGE_VERSION);
61 PA_MODULE_LOAD_ONCE(FALSE);
62 PA_MODULE_USAGE(
63         _("source_name=<name for the source> "
64           "source_properties=<properties for the source> "
65           "source_master=<name of source to filter> "
66           "sink_name=<name for the sink> "
67           "sink_properties=<properties for the sink> "
68           "sink_master=<name of sink to filter> "
69           "adjust_time=<how often to readjust rates in s> "
70           "adjust_threshold=<how much drift to readjust after in ms> "
71           "format=<sample format> "
72           "rate=<sample rate> "
73           "channels=<number of channels> "
74           "channel_map=<channel map> "
75           "aec_method=<implementation to use> "
76           "aec_args=<parameters for the AEC engine> "
77           "save_aec=<save AEC data in /tmp> "
78           "autoloaded=<set if this module is being loaded automatically> "
79           "use_volume_sharing=<yes or no> "
80         ));
81
82 /* NOTE: Make sure the enum and ec_table are maintained in the correct order */
83 typedef enum {
84     PA_ECHO_CANCELLER_INVALID = -1,
85 #ifdef HAVE_SPEEX
86     PA_ECHO_CANCELLER_SPEEX,
87 #endif
88     PA_ECHO_CANCELLER_ADRIAN,
89 #ifdef HAVE_WEBRTC
90     PA_ECHO_CANCELLER_WEBRTC,
91 #endif
92 } pa_echo_canceller_method_t;
93
94 #ifdef HAVE_WEBRTC
95 #define DEFAULT_ECHO_CANCELLER "webrtc"
96 #else
97 #define DEFAULT_ECHO_CANCELLER "speex"
98 #endif
99
100 static const pa_echo_canceller ec_table[] = {
101 #ifdef HAVE_SPEEX
102     {
103         /* Speex */
104         .init                   = pa_speex_ec_init,
105         .run                    = pa_speex_ec_run,
106         .done                   = pa_speex_ec_done,
107     },
108 #endif
109     {
110         /* Adrian Andre's NLMS implementation */
111         .init                   = pa_adrian_ec_init,
112         .run                    = pa_adrian_ec_run,
113         .done                   = pa_adrian_ec_done,
114     },
115 #ifdef HAVE_WEBRTC
116     {
117         /* WebRTC's audio processing engine */
118         .init                   = pa_webrtc_ec_init,
119         .play                   = pa_webrtc_ec_play,
120         .record                 = pa_webrtc_ec_record,
121         .set_drift              = pa_webrtc_ec_set_drift,
122         .run                    = pa_webrtc_ec_run,
123         .done                   = pa_webrtc_ec_done,
124     },
125 #endif
126 };
127
128 #define DEFAULT_RATE 32000
129 #define DEFAULT_CHANNELS 1
130 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
131 #define DEFAULT_ADJUST_TOLERANCE (5*PA_USEC_PER_MSEC)
132 #define DEFAULT_SAVE_AEC FALSE
133 #define DEFAULT_AUTOLOADED FALSE
134
135 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
136
137 /* Can only be used in main context */
138 #define IS_ACTIVE(u) ((pa_source_get_state((u)->source) == PA_SOURCE_RUNNING) && \
139                       (pa_sink_get_state((u)->sink) == PA_SINK_RUNNING))
140
141 /* This module creates a new (virtual) source and sink.
142  *
143  * The data sent to the new sink is kept in a memblockq before being
144  * forwarded to the real sink_master.
145  *
146  * Data read from source_master is matched against the saved sink data and
147  * echo canceled data is then pushed onto the new source.
148  *
149  * Both source and sink masters have their own threads to push/pull data
150  * respectively. We however perform all our actions in the source IO thread.
151  * To do this we send all played samples to the source IO thread where they
152  * are then pushed into the memblockq.
153  *
154  * Alignment is performed in two steps:
155  *
156  * 1) when something happens that requires quick adjustment of the alignment of
157  *    capture and playback samples, we perform a resync. This adjusts the
158  *    position in the playback memblock to the requested sample. Quick
159  *    adjustments include moving the playback samples before the capture
160  *    samples (because else the echo canceler does not work) or when the
161  *    playback pointer drifts too far away.
162  *
163  * 2) periodically check the difference between capture and playback. we use a
164  *    low and high watermark for adjusting the alignment. playback should always
165  *    be before capture and the difference should not be bigger than one frame
166  *    size. We would ideally like to resample the sink_input but most driver
167  *    don't give enough accuracy to be able to do that right now.
168  */
169
170 struct userdata;
171
172 struct pa_echo_canceller_msg {
173     pa_msgobject parent;
174     struct userdata *userdata;
175 };
176
177 PA_DEFINE_PRIVATE_CLASS(pa_echo_canceller_msg, pa_msgobject);
178 #define PA_ECHO_CANCELLER_MSG(o) (pa_echo_canceller_msg_cast(o))
179
180 struct snapshot {
181     pa_usec_t sink_now;
182     pa_usec_t sink_latency;
183     size_t sink_delay;
184     int64_t send_counter;
185
186     pa_usec_t source_now;
187     pa_usec_t source_latency;
188     size_t source_delay;
189     int64_t recv_counter;
190     size_t rlen;
191     size_t plen;
192 };
193
194 struct userdata {
195     pa_core *core;
196     pa_module *module;
197
198     pa_bool_t autoloaded;
199     pa_bool_t dead;
200     pa_bool_t save_aec;
201
202     pa_echo_canceller *ec;
203     uint32_t blocksize;
204
205     pa_bool_t need_realign;
206
207     /* to wakeup the source I/O thread */
208     pa_asyncmsgq *asyncmsgq;
209     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
210
211     pa_source *source;
212     pa_bool_t source_auto_desc;
213     pa_source_output *source_output;
214     pa_memblockq *source_memblockq; /* echo canceler needs fixed sized chunks */
215     size_t source_skip;
216
217     pa_sink *sink;
218     pa_bool_t sink_auto_desc;
219     pa_sink_input *sink_input;
220     pa_memblockq *sink_memblockq;
221     int64_t send_counter;          /* updated in sink IO thread */
222     int64_t recv_counter;
223     size_t sink_skip;
224
225     /* Bytes left over from previous iteration */
226     size_t sink_rem;
227     size_t source_rem;
228
229     pa_atomic_t request_resync;
230
231     pa_time_event *time_event;
232     pa_usec_t adjust_time;
233     int adjust_threshold;
234
235     FILE *captured_file;
236     FILE *played_file;
237     FILE *canceled_file;
238     FILE *drift_file;
239
240     pa_bool_t use_volume_sharing;
241
242     struct {
243         pa_cvolume current_volume;
244     } thread_info;
245 };
246
247 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
248
249 static const char* const valid_modargs[] = {
250     "source_name",
251     "source_properties",
252     "source_master",
253     "sink_name",
254     "sink_properties",
255     "sink_master",
256     "adjust_time",
257     "adjust_threshold",
258     "format",
259     "rate",
260     "channels",
261     "channel_map",
262     "aec_method",
263     "aec_args",
264     "save_aec",
265     "autoloaded",
266     "use_volume_sharing",
267     NULL
268 };
269
270 enum {
271     SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
272     SOURCE_OUTPUT_MESSAGE_REWIND,
273     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
274     SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
275 };
276
277 enum {
278     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
279 };
280
281 enum {
282     ECHO_CANCELLER_MESSAGE_SET_VOLUME,
283 };
284
285 static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
286     int64_t buffer, diff_time, buffer_latency;
287
288     /* get the number of samples between capture and playback */
289     if (snapshot->plen > snapshot->rlen)
290         buffer = snapshot->plen - snapshot->rlen;
291     else
292         buffer = 0;
293
294     buffer += snapshot->source_delay + snapshot->sink_delay;
295
296     /* add the amount of samples not yet transferred to the source context */
297     if (snapshot->recv_counter <= snapshot->send_counter)
298         buffer += (int64_t) (snapshot->send_counter - snapshot->recv_counter);
299     else
300         buffer += PA_CLIP_SUB(buffer, (int64_t) (snapshot->recv_counter - snapshot->send_counter));
301
302     /* convert to time */
303     buffer_latency = pa_bytes_to_usec(buffer, &u->source_output->sample_spec);
304
305     /* capture and playback samples are perfectly aligned when diff_time is 0 */
306     diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
307           (snapshot->source_now - snapshot->source_latency);
308
309     pa_log_debug("diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
310         (long long) snapshot->sink_latency,
311         (long long) buffer_latency, (long long) snapshot->source_latency,
312         (long long) snapshot->source_delay, (long long) snapshot->sink_delay,
313         (long long) (snapshot->send_counter - snapshot->recv_counter),
314         (long long) (snapshot->sink_now - snapshot->source_now));
315
316     return diff_time;
317 }
318
319 /* Called from main context */
320 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
321     struct userdata *u = userdata;
322     uint32_t old_rate, base_rate, new_rate;
323     int64_t diff_time;
324     /*size_t fs*/
325     struct snapshot latency_snapshot;
326
327     pa_assert(u);
328     pa_assert(a);
329     pa_assert(u->time_event == e);
330     pa_assert_ctl_context();
331
332     if (!IS_ACTIVE(u))
333         return;
334
335     /* update our snapshots */
336     pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
337     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
338
339     /* calculate drift between capture and playback */
340     diff_time = calc_diff(u, &latency_snapshot);
341
342     /*fs = pa_frame_size(&u->source_output->sample_spec);*/
343     old_rate = u->sink_input->sample_spec.rate;
344     base_rate = u->source_output->sample_spec.rate;
345
346     if (diff_time < 0) {
347         /* recording before playback, we need to adjust quickly. The echo
348          * canceler does not work in this case. */
349         pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
350             NULL, diff_time, NULL, NULL);
351         /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
352         new_rate = base_rate;
353     }
354     else {
355         if (diff_time > u->adjust_threshold) {
356             /* diff too big, quickly adjust */
357             pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
358                 NULL, diff_time, NULL, NULL);
359         }
360
361         /* recording behind playback, we need to slowly adjust the rate to match */
362         /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
363
364         /* assume equal samplerates for now */
365         new_rate = base_rate;
366     }
367
368     /* make sure we don't make too big adjustments because that sounds horrible */
369     if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
370         new_rate = base_rate;
371
372     if (new_rate != old_rate) {
373         pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
374
375         pa_sink_input_set_rate(u->sink_input, new_rate);
376     }
377
378     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
379 }
380
381 /* Called from source I/O thread context */
382 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
383     struct userdata *u = PA_SOURCE(o)->userdata;
384
385     switch (code) {
386
387         case PA_SOURCE_MESSAGE_GET_LATENCY:
388
389             /* The source is _put() before the source output is, so let's
390              * make sure we don't access it in that time. Also, the
391              * source output is first shut down, the source second. */
392             if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
393                 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
394                 *((pa_usec_t*) data) = 0;
395                 return 0;
396             }
397
398             *((pa_usec_t*) data) =
399
400                 /* Get the latency of the master source */
401                 pa_source_get_latency_within_thread(u->source_output->source) +
402                 /* Add the latency internal to our source output on top */
403                 pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
404                 /* and the buffering we do on the source */
405                 pa_bytes_to_usec(u->blocksize, &u->source_output->source->sample_spec);
406
407             return 0;
408
409         case PA_SOURCE_MESSAGE_SET_VOLUME_SYNCED:
410             u->thread_info.current_volume = u->source->reference_volume;
411             break;
412     }
413
414     return pa_source_process_msg(o, code, data, offset, chunk);
415 }
416
417 /* Called from sink I/O thread context */
418 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
419     struct userdata *u = PA_SINK(o)->userdata;
420
421     switch (code) {
422
423         case PA_SINK_MESSAGE_GET_LATENCY:
424
425             /* The sink is _put() before the sink input is, so let's
426              * make sure we don't access it in that time. Also, the
427              * sink input is first shut down, the sink second. */
428             if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
429                 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
430                 *((pa_usec_t*) data) = 0;
431                 return 0;
432             }
433
434             *((pa_usec_t*) data) =
435
436                 /* Get the latency of the master sink */
437                 pa_sink_get_latency_within_thread(u->sink_input->sink) +
438
439                 /* Add the latency internal to our sink input on top */
440                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
441
442             return 0;
443     }
444
445     return pa_sink_process_msg(o, code, data, offset, chunk);
446 }
447
448
449 /* Called from main context */
450 static int source_set_state_cb(pa_source *s, pa_source_state_t state) {
451     struct userdata *u;
452
453     pa_source_assert_ref(s);
454     pa_assert_se(u = s->userdata);
455
456     if (!PA_SOURCE_IS_LINKED(state) ||
457         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
458         return 0;
459
460     if (state == PA_SOURCE_RUNNING) {
461         /* restart timer when both sink and source are active */
462         if (IS_ACTIVE(u) && u->adjust_time)
463             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
464
465         pa_atomic_store(&u->request_resync, 1);
466         pa_source_output_cork(u->source_output, FALSE);
467     } else if (state == PA_SOURCE_SUSPENDED) {
468         pa_source_output_cork(u->source_output, TRUE);
469     }
470
471     return 0;
472 }
473
474 /* Called from main context */
475 static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state) {
476     struct userdata *u;
477
478     pa_sink_assert_ref(s);
479     pa_assert_se(u = s->userdata);
480
481     if (!PA_SINK_IS_LINKED(state) ||
482         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
483         return 0;
484
485     if (state == PA_SINK_RUNNING) {
486         /* restart timer when both sink and source are active */
487         if (IS_ACTIVE(u) && u->adjust_time)
488             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
489
490         pa_atomic_store(&u->request_resync, 1);
491         pa_sink_input_cork(u->sink_input, FALSE);
492     } else if (state == PA_SINK_SUSPENDED) {
493         pa_sink_input_cork(u->sink_input, TRUE);
494     }
495
496     return 0;
497 }
498
499 /* Called from I/O thread context */
500 static void source_update_requested_latency_cb(pa_source *s) {
501     struct userdata *u;
502
503     pa_source_assert_ref(s);
504     pa_assert_se(u = s->userdata);
505
506     if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
507         !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
508         return;
509
510     pa_log_debug("Source update requested latency");
511
512     /* Just hand this one over to the master source */
513     pa_source_output_set_requested_latency_within_thread(
514             u->source_output,
515             pa_source_get_requested_latency_within_thread(s));
516 }
517
518 /* Called from I/O thread context */
519 static void sink_update_requested_latency_cb(pa_sink *s) {
520     struct userdata *u;
521
522     pa_sink_assert_ref(s);
523     pa_assert_se(u = s->userdata);
524
525     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
526         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
527         return;
528
529     pa_log_debug("Sink update requested latency");
530
531     /* Just hand this one over to the master sink */
532     pa_sink_input_set_requested_latency_within_thread(
533             u->sink_input,
534             pa_sink_get_requested_latency_within_thread(s));
535 }
536
537 /* Called from I/O thread context */
538 static void sink_request_rewind_cb(pa_sink *s) {
539     struct userdata *u;
540
541     pa_sink_assert_ref(s);
542     pa_assert_se(u = s->userdata);
543
544     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
545         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
546         return;
547
548     pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
549
550     /* Just hand this one over to the master sink */
551     pa_sink_input_request_rewind(u->sink_input,
552                                  s->thread_info.rewind_nbytes, TRUE, FALSE, FALSE);
553 }
554
555 /* Called from main context */
556 static void source_set_volume_cb(pa_source *s) {
557     struct userdata *u;
558
559     pa_source_assert_ref(s);
560     pa_assert_se(u = s->userdata);
561
562     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
563         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
564         return;
565
566     pa_source_output_set_volume(u->source_output, &s->real_volume, s->save_volume, TRUE);
567 }
568
569 /* Called from main context */
570 static void sink_set_volume_cb(pa_sink *s) {
571     struct userdata *u;
572
573     pa_sink_assert_ref(s);
574     pa_assert_se(u = s->userdata);
575
576     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
577         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
578         return;
579
580     pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, TRUE);
581 }
582
583 static void source_get_volume_cb(pa_source *s) {
584     struct userdata *u;
585     pa_cvolume v;
586
587     pa_source_assert_ref(s);
588     pa_assert_se(u = s->userdata);
589
590     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
591         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
592         return;
593
594     pa_source_output_get_volume(u->source_output, &v, TRUE);
595
596     if (pa_cvolume_equal(&s->real_volume, &v))
597         /* no change */
598         return;
599
600     s->real_volume = v;
601     pa_source_set_soft_volume(s, NULL);
602 }
603
604 /* Called from main context */
605 static void source_set_mute_cb(pa_source *s) {
606     struct userdata *u;
607
608     pa_source_assert_ref(s);
609     pa_assert_se(u = s->userdata);
610
611     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
612         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
613         return;
614
615     pa_source_output_set_mute(u->source_output, s->muted, s->save_muted);
616 }
617
618 /* Called from main context */
619 static void sink_set_mute_cb(pa_sink *s) {
620     struct userdata *u;
621
622     pa_sink_assert_ref(s);
623     pa_assert_se(u = s->userdata);
624
625     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
626         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
627         return;
628
629     pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
630 }
631
632 /* Called from main context */
633 static void source_get_mute_cb(pa_source *s) {
634     struct userdata *u;
635
636     pa_source_assert_ref(s);
637     pa_assert_se(u = s->userdata);
638
639     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
640         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
641         return;
642
643     pa_source_output_get_mute(u->source_output);
644 }
645
646 /* must be called from the input thread context */
647 static void apply_diff_time(struct userdata *u, int64_t diff_time) {
648     int64_t diff;
649
650     if (diff_time < 0) {
651         diff = pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec);
652
653         if (diff > 0) {
654             /* add some extra safety samples to compensate for jitter in the
655              * timings */
656             diff += 10 * pa_frame_size (&u->source_output->sample_spec);
657
658             pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
659
660             u->sink_skip = diff;
661             u->source_skip = 0;
662         }
663     } else if (diff_time > 0) {
664         diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
665
666         if (diff > 0) {
667             pa_log("playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
668
669             u->source_skip = diff;
670             u->sink_skip = 0;
671         }
672     }
673 }
674
675 /* must be called from the input thread */
676 static void do_resync(struct userdata *u) {
677     int64_t diff_time;
678     struct snapshot latency_snapshot;
679
680     pa_log("Doing resync");
681
682     /* update our snapshot */
683     source_output_snapshot_within_thread(u, &latency_snapshot);
684     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
685
686     /* calculate drift between capture and playback */
687     diff_time = calc_diff(u, &latency_snapshot);
688
689     /* and adjust for the drift */
690     apply_diff_time(u, diff_time);
691 }
692
693 /* 1. Calculate drift at this point, pass to canceller
694  * 2. Push out playback samples in blocksize chunks
695  * 3. Push out capture samples in blocksize chunks
696  * 4. ???
697  * 5. Profit
698  */
699 static void do_push_drift_comp(struct userdata *u) {
700     size_t rlen, plen;
701     pa_memchunk rchunk, pchunk, cchunk;
702     uint8_t *rdata, *pdata, *cdata;
703     float drift;
704     int unused PA_GCC_UNUSED;
705
706     rlen = pa_memblockq_get_length(u->source_memblockq);
707     plen = pa_memblockq_get_length(u->sink_memblockq);
708
709     /* Estimate snapshot drift as follows:
710      *   pd: amount of data consumed since last time
711      *   rd: amount of data consumed since last time
712      *
713      *   drift = (pd - rd) / rd;
714      *
715      * We calculate pd and rd as the memblockq length less the number of
716      * samples left from the last iteration (to avoid double counting
717      * those remainder samples.
718      */
719     drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
720     u->sink_rem = plen % u->blocksize;
721     u->source_rem = rlen % u->blocksize;
722
723     /* Now let the canceller work its drift compensation magic */
724     u->ec->set_drift(u->ec, drift);
725
726     if (u->save_aec) {
727         if (u->drift_file)
728             fprintf(u->drift_file, "d %a\n", drift);
729     }
730
731     /* Send in the playback samples first */
732     while (plen >= u->blocksize) {
733         pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
734         pdata = pa_memblock_acquire(pchunk.memblock);
735         pdata += pchunk.index;
736
737         u->ec->play(u->ec, pdata);
738
739         if (u->save_aec) {
740             if (u->drift_file)
741                 fprintf(u->drift_file, "p %d\n", u->blocksize);
742             if (u->played_file)
743                 unused = fwrite(pdata, 1, u->blocksize, u->played_file);
744         }
745
746         pa_memblock_release(pchunk.memblock);
747         pa_memblockq_drop(u->sink_memblockq, u->blocksize);
748         pa_memblock_unref(pchunk.memblock);
749
750         plen -= u->blocksize;
751     }
752
753     /* And now the capture samples */
754     while (rlen >= u->blocksize) {
755         pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
756
757         rdata = pa_memblock_acquire(rchunk.memblock);
758         rdata += rchunk.index;
759
760         cchunk.index = 0;
761         cchunk.length = u->blocksize;
762         cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
763         cdata = pa_memblock_acquire(cchunk.memblock);
764
765         u->ec->record(u->ec, rdata, cdata);
766
767         if (u->save_aec) {
768             if (u->drift_file)
769                 fprintf(u->drift_file, "c %d\n", u->blocksize);
770             if (u->captured_file)
771                 unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
772             if (u->canceled_file)
773                 unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
774         }
775
776         pa_memblock_release(cchunk.memblock);
777         pa_memblock_release(rchunk.memblock);
778
779         pa_memblock_unref(rchunk.memblock);
780
781         pa_source_post(u->source, &cchunk);
782         pa_memblock_unref(cchunk.memblock);
783
784         pa_memblockq_drop(u->source_memblockq, u->blocksize);
785         rlen -= u->blocksize;
786     }
787 }
788
789 /* This one's simpler than the drift compensation case -- we just iterate over
790  * the capture buffer, and pass the canceller blocksize bytes of playback and
791  * capture data. */
792 static void do_push(struct userdata *u) {
793     size_t rlen, plen;
794     pa_memchunk rchunk, pchunk, cchunk;
795     uint8_t *rdata, *pdata, *cdata;
796     int unused PA_GCC_UNUSED;
797
798     rlen = pa_memblockq_get_length(u->source_memblockq);
799     plen = pa_memblockq_get_length(u->sink_memblockq);
800
801     while (rlen >= u->blocksize) {
802         /* take fixed block from recorded samples */
803         pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
804
805         if (plen > u->blocksize) {
806             if (plen > u->blocksize) {
807                 /* take fixed block from played samples */
808                 pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
809
810                 rdata = pa_memblock_acquire(rchunk.memblock);
811                 rdata += rchunk.index;
812                 pdata = pa_memblock_acquire(pchunk.memblock);
813                 pdata += pchunk.index;
814
815                 cchunk.index = 0;
816                 cchunk.length = u->blocksize;
817                 cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
818                 cdata = pa_memblock_acquire(cchunk.memblock);
819
820                 if (u->save_aec) {
821                     if (u->captured_file)
822                         unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
823                     if (u->played_file)
824                         unused = fwrite(pdata, 1, u->blocksize, u->played_file);
825                 }
826
827                 /* perform echo cancellation */
828                 u->ec->run(u->ec, rdata, pdata, cdata);
829
830                 if (u->save_aec) {
831                     if (u->canceled_file)
832                         unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
833                 }
834
835                 pa_memblock_release(cchunk.memblock);
836                 pa_memblock_release(pchunk.memblock);
837                 pa_memblock_release(rchunk.memblock);
838
839                 /* drop consumed sink samples */
840                 pa_memblockq_drop(u->sink_memblockq, u->blocksize);
841                 pa_memblock_unref(pchunk.memblock);
842
843                 pa_memblock_unref(rchunk.memblock);
844                 /* the filtered samples now become the samples from our
845                  * source */
846                 rchunk = cchunk;
847
848                 plen -= u->blocksize;
849             }
850         }
851
852         /* forward the (echo-canceled) data to the virtual source */
853         pa_source_post(u->source, &rchunk);
854         pa_memblock_unref(rchunk.memblock);
855
856         pa_memblockq_drop(u->source_memblockq, u->blocksize);
857         rlen -= u->blocksize;
858     }
859 }
860
861 /* Called from input thread context */
862 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
863     struct userdata *u;
864     size_t rlen, plen, to_skip;
865     pa_memchunk rchunk;
866
867     pa_source_output_assert_ref(o);
868     pa_source_output_assert_io_context(o);
869     pa_assert_se(u = o->userdata);
870
871     if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
872         pa_log("push when no link?");
873         return;
874     }
875
876     if (PA_UNLIKELY(u->source->thread_info.state != PA_SOURCE_RUNNING ||
877                     u->sink->thread_info.state != PA_SINK_RUNNING)) {
878         pa_source_post(u->source, chunk);
879         return;
880     }
881
882     /* handle queued messages, do any message sending of our own */
883     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
884         ;
885
886     pa_memblockq_push_align(u->source_memblockq, chunk);
887
888     rlen = pa_memblockq_get_length(u->source_memblockq);
889     plen = pa_memblockq_get_length(u->sink_memblockq);
890
891     /* Let's not do anything else till we have enough data to process */
892     if (rlen < u->blocksize)
893         return;
894
895     /* See if we need to drop samples in order to sync */
896     if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
897         do_resync(u);
898     }
899
900     /* Okay, skip cancellation for skipped source samples if needed. */
901     if (PA_UNLIKELY(u->source_skip)) {
902         /* The slightly tricky bit here is that we drop all but modulo
903          * blocksize bytes and then adjust for that last bit on the sink side.
904          * We do this because the source data is coming at a fixed rate, which
905          * means the only way to try to catch up is drop sink samples and let
906          * the canceller cope up with this. */
907         to_skip = rlen >= u->source_skip ? u->source_skip : rlen;
908         to_skip -= to_skip % u->blocksize;
909
910         if (to_skip) {
911             pa_memblockq_peek_fixed_size(u->source_memblockq, to_skip, &rchunk);
912             pa_source_post(u->source, &rchunk);
913
914             pa_memblock_unref(rchunk.memblock);
915             pa_memblockq_drop(u->source_memblockq, u->blocksize);
916
917             rlen -= to_skip;
918             u->source_skip -= to_skip;
919         }
920
921         if (rlen && u->source_skip % u->blocksize) {
922             u->sink_skip += u->blocksize - (u->source_skip % u->blocksize);
923             u->source_skip -= (u->source_skip % u->blocksize);
924         }
925     }
926
927     /* And for the sink, these samples have been played back already, so we can
928      * just drop them and get on with it. */
929     if (PA_UNLIKELY(u->sink_skip)) {
930         to_skip = plen >= u->sink_skip ? u->sink_skip : plen;
931
932         pa_memblockq_drop(u->sink_memblockq, to_skip);
933
934         plen -= to_skip;
935         u->sink_skip -= to_skip;
936     }
937
938     /* process and push out samples */
939     if (u->ec->params.drift_compensation)
940         do_push_drift_comp(u);
941     else
942         do_push(u);
943 }
944
945 /* Called from I/O thread context */
946 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
947     struct userdata *u;
948
949     pa_sink_input_assert_ref(i);
950     pa_assert(chunk);
951     pa_assert_se(u = i->userdata);
952
953     if (u->sink->thread_info.rewind_requested)
954         pa_sink_process_rewind(u->sink, 0);
955
956     pa_sink_render_full(u->sink, nbytes, chunk);
957
958     if (i->thread_info.underrun_for > 0) {
959         pa_log_debug("Handling end of underrun.");
960         pa_atomic_store(&u->request_resync, 1);
961     }
962
963     /* let source thread handle the chunk. pass the sample count as well so that
964      * the source IO thread can update the right variables. */
965     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
966         NULL, 0, chunk, NULL);
967     u->send_counter += chunk->length;
968
969     return 0;
970 }
971
972 /* Called from input thread context */
973 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
974     struct userdata *u;
975
976     pa_source_output_assert_ref(o);
977     pa_source_output_assert_io_context(o);
978     pa_assert_se(u = o->userdata);
979
980     pa_source_process_rewind(u->source, nbytes);
981
982     /* go back on read side, we need to use older sink data for this */
983     pa_memblockq_rewind(u->sink_memblockq, nbytes);
984
985     /* manipulate write index */
986     pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, TRUE);
987
988     pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
989         (long long) pa_memblockq_get_length (u->source_memblockq));
990 }
991
992 /* Called from I/O thread context */
993 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
994     struct userdata *u;
995
996     pa_sink_input_assert_ref(i);
997     pa_assert_se(u = i->userdata);
998
999     pa_log_debug("Sink process rewind %lld", (long long) nbytes);
1000
1001     pa_sink_process_rewind(u->sink, nbytes);
1002
1003     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
1004     u->send_counter -= nbytes;
1005 }
1006
1007 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
1008     size_t delay, rlen, plen;
1009     pa_usec_t now, latency;
1010
1011     now = pa_rtclock_now();
1012     latency = pa_source_get_latency_within_thread(u->source_output->source);
1013     delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
1014
1015     delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
1016     rlen = pa_memblockq_get_length(u->source_memblockq);
1017     plen = pa_memblockq_get_length(u->sink_memblockq);
1018
1019     snapshot->source_now = now;
1020     snapshot->source_latency = latency;
1021     snapshot->source_delay = delay;
1022     snapshot->recv_counter = u->recv_counter;
1023     snapshot->rlen = rlen + u->sink_skip;
1024     snapshot->plen = plen + u->source_skip;
1025 }
1026
1027
1028 /* Called from output thread context */
1029 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1030     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
1031
1032     switch (code) {
1033
1034         case SOURCE_OUTPUT_MESSAGE_POST:
1035
1036             pa_source_output_assert_io_context(u->source_output);
1037
1038             if (u->source_output->source->thread_info.state == PA_SOURCE_RUNNING)
1039                 pa_memblockq_push_align(u->sink_memblockq, chunk);
1040             else
1041                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1042
1043             u->recv_counter += (int64_t) chunk->length;
1044
1045             return 0;
1046
1047         case SOURCE_OUTPUT_MESSAGE_REWIND:
1048             pa_source_output_assert_io_context(u->source_output);
1049
1050             /* manipulate write index, never go past what we have */
1051             if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
1052                 pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, TRUE);
1053             else
1054                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1055
1056             pa_log_debug("Sink rewind (%lld)", (long long) offset);
1057
1058             u->recv_counter -= offset;
1059
1060             return 0;
1061
1062         case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
1063             struct snapshot *snapshot = (struct snapshot *) data;
1064
1065             source_output_snapshot_within_thread(u, snapshot);
1066             return 0;
1067         }
1068
1069         case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
1070             apply_diff_time(u, offset);
1071             return 0;
1072
1073     }
1074
1075     return pa_source_output_process_msg(obj, code, data, offset, chunk);
1076 }
1077
1078 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1079     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1080
1081     switch (code) {
1082
1083         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1084             size_t delay;
1085             pa_usec_t now, latency;
1086             struct snapshot *snapshot = (struct snapshot *) data;
1087
1088             pa_sink_input_assert_io_context(u->sink_input);
1089
1090             now = pa_rtclock_now();
1091             latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
1092             delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1093
1094             delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
1095
1096             snapshot->sink_now = now;
1097             snapshot->sink_latency = latency;
1098             snapshot->sink_delay = delay;
1099             snapshot->send_counter = u->send_counter;
1100             return 0;
1101         }
1102     }
1103
1104     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1105 }
1106
1107 /* Called from I/O thread context */
1108 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1109     struct userdata *u;
1110
1111     pa_sink_input_assert_ref(i);
1112     pa_assert_se(u = i->userdata);
1113
1114     pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
1115
1116     pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
1117     pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
1118 }
1119
1120 /* Called from I/O thread context */
1121 static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
1122     struct userdata *u;
1123
1124     pa_source_output_assert_ref(o);
1125     pa_assert_se(u = o->userdata);
1126
1127     pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
1128
1129     pa_source_set_max_rewind_within_thread(u->source, nbytes);
1130 }
1131
1132 /* Called from I/O thread context */
1133 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1134     struct userdata *u;
1135
1136     pa_sink_input_assert_ref(i);
1137     pa_assert_se(u = i->userdata);
1138
1139     pa_log_debug("Sink input update max request %lld", (long long) nbytes);
1140
1141     pa_sink_set_max_request_within_thread(u->sink, nbytes);
1142 }
1143
1144 /* Called from I/O thread context */
1145 static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
1146     struct userdata *u;
1147     pa_usec_t latency;
1148
1149     pa_sink_input_assert_ref(i);
1150     pa_assert_se(u = i->userdata);
1151
1152     latency = pa_sink_get_requested_latency_within_thread(i->sink);
1153
1154     pa_log_debug("Sink input update requested latency %lld", (long long) latency);
1155 }
1156
1157 /* Called from I/O thread context */
1158 static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
1159     struct userdata *u;
1160     pa_usec_t latency;
1161
1162     pa_source_output_assert_ref(o);
1163     pa_assert_se(u = o->userdata);
1164
1165     latency = pa_source_get_requested_latency_within_thread(o->source);
1166
1167     pa_log_debug("source output update requested latency %lld", (long long) latency);
1168 }
1169
1170 /* Called from I/O thread context */
1171 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
1172     struct userdata *u;
1173
1174     pa_sink_input_assert_ref(i);
1175     pa_assert_se(u = i->userdata);
1176
1177     pa_log_debug("Sink input update latency range %lld %lld",
1178         (long long) i->sink->thread_info.min_latency,
1179         (long long) i->sink->thread_info.max_latency);
1180
1181     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1182 }
1183
1184 /* Called from I/O thread context */
1185 static void source_output_update_source_latency_range_cb(pa_source_output *o) {
1186     struct userdata *u;
1187
1188     pa_source_output_assert_ref(o);
1189     pa_assert_se(u = o->userdata);
1190
1191     pa_log_debug("Source output update latency range %lld %lld",
1192         (long long) o->source->thread_info.min_latency,
1193         (long long) o->source->thread_info.max_latency);
1194
1195     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1196 }
1197
1198 /* Called from I/O thread context */
1199 static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1200     struct userdata *u;
1201
1202     pa_sink_input_assert_ref(i);
1203     pa_assert_se(u = i->userdata);
1204
1205     pa_log_debug("Sink input update fixed latency %lld",
1206         (long long) i->sink->thread_info.fixed_latency);
1207
1208     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1209 }
1210
1211 /* Called from I/O thread context */
1212 static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1213     struct userdata *u;
1214
1215     pa_source_output_assert_ref(o);
1216     pa_assert_se(u = o->userdata);
1217
1218     pa_log_debug("Source output update fixed latency %lld",
1219         (long long) o->source->thread_info.fixed_latency);
1220
1221     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1222 }
1223
1224 /* Called from output thread context */
1225 static void source_output_attach_cb(pa_source_output *o) {
1226     struct userdata *u;
1227
1228     pa_source_output_assert_ref(o);
1229     pa_source_output_assert_io_context(o);
1230     pa_assert_se(u = o->userdata);
1231
1232     pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1233     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1234     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1235     pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1236
1237     pa_log_debug("Source output %d attach", o->index);
1238
1239     pa_source_attach_within_thread(u->source);
1240
1241     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1242             o->source->thread_info.rtpoll,
1243             PA_RTPOLL_LATE,
1244             u->asyncmsgq);
1245 }
1246
1247 /* Called from I/O thread context */
1248 static void sink_input_attach_cb(pa_sink_input *i) {
1249     struct userdata *u;
1250
1251     pa_sink_input_assert_ref(i);
1252     pa_assert_se(u = i->userdata);
1253
1254     pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1255     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1256
1257     /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1258      * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1259     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1260
1261     /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1262      * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1263      * HERE. SEE (6) */
1264     pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1265     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1266
1267     pa_log_debug("Sink input %d attach", i->index);
1268
1269     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1270             i->sink->thread_info.rtpoll,
1271             PA_RTPOLL_LATE,
1272             u->asyncmsgq);
1273
1274     pa_sink_attach_within_thread(u->sink);
1275 }
1276
1277
1278 /* Called from output thread context */
1279 static void source_output_detach_cb(pa_source_output *o) {
1280     struct userdata *u;
1281
1282     pa_source_output_assert_ref(o);
1283     pa_source_output_assert_io_context(o);
1284     pa_assert_se(u = o->userdata);
1285
1286     pa_source_detach_within_thread(u->source);
1287     pa_source_set_rtpoll(u->source, NULL);
1288
1289     pa_log_debug("Source output %d detach", o->index);
1290
1291     if (u->rtpoll_item_read) {
1292         pa_rtpoll_item_free(u->rtpoll_item_read);
1293         u->rtpoll_item_read = NULL;
1294     }
1295 }
1296
1297 /* Called from I/O thread context */
1298 static void sink_input_detach_cb(pa_sink_input *i) {
1299     struct userdata *u;
1300
1301     pa_sink_input_assert_ref(i);
1302     pa_assert_se(u = i->userdata);
1303
1304     pa_sink_detach_within_thread(u->sink);
1305
1306     pa_sink_set_rtpoll(u->sink, NULL);
1307
1308     pa_log_debug("Sink input %d detach", i->index);
1309
1310     if (u->rtpoll_item_write) {
1311         pa_rtpoll_item_free(u->rtpoll_item_write);
1312         u->rtpoll_item_write = NULL;
1313     }
1314 }
1315
1316 /* Called from output thread context */
1317 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1318     struct userdata *u;
1319
1320     pa_source_output_assert_ref(o);
1321     pa_source_output_assert_io_context(o);
1322     pa_assert_se(u = o->userdata);
1323
1324     pa_log_debug("Source output %d state %d", o->index, state);
1325 }
1326
1327 /* Called from IO thread context */
1328 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1329     struct userdata *u;
1330
1331     pa_sink_input_assert_ref(i);
1332     pa_assert_se(u = i->userdata);
1333
1334     pa_log_debug("Sink input %d state %d", i->index, state);
1335
1336     /* If we are added for the first time, ask for a rewinding so that
1337      * we are heard right-away. */
1338     if (PA_SINK_INPUT_IS_LINKED(state) &&
1339         i->thread_info.state == PA_SINK_INPUT_INIT) {
1340         pa_log_debug("Requesting rewind due to state change.");
1341         pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
1342     }
1343 }
1344
1345 /* Called from main thread */
1346 static void source_output_kill_cb(pa_source_output *o) {
1347     struct userdata *u;
1348
1349     pa_source_output_assert_ref(o);
1350     pa_assert_ctl_context();
1351     pa_assert_se(u = o->userdata);
1352
1353     u->dead = TRUE;
1354
1355     /* The order here matters! We first kill the source output, followed
1356      * by the source. That means the source callbacks must be protected
1357      * against an unconnected source output! */
1358     pa_source_output_unlink(u->source_output);
1359     pa_source_unlink(u->source);
1360
1361     pa_source_output_unref(u->source_output);
1362     u->source_output = NULL;
1363
1364     pa_source_unref(u->source);
1365     u->source = NULL;
1366
1367     pa_log_debug("Source output kill %d", o->index);
1368
1369     pa_module_unload_request(u->module, TRUE);
1370 }
1371
1372 /* Called from main context */
1373 static void sink_input_kill_cb(pa_sink_input *i) {
1374     struct userdata *u;
1375
1376     pa_sink_input_assert_ref(i);
1377     pa_assert_se(u = i->userdata);
1378
1379     u->dead = TRUE;
1380
1381     /* The order here matters! We first kill the sink input, followed
1382      * by the sink. That means the sink callbacks must be protected
1383      * against an unconnected sink input! */
1384     pa_sink_input_unlink(u->sink_input);
1385     pa_sink_unlink(u->sink);
1386
1387     pa_sink_input_unref(u->sink_input);
1388     u->sink_input = NULL;
1389
1390     pa_sink_unref(u->sink);
1391     u->sink = NULL;
1392
1393     pa_log_debug("Sink input kill %d", i->index);
1394
1395     pa_module_unload_request(u->module, TRUE);
1396 }
1397
1398 /* Called from main thread */
1399 static pa_bool_t source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1400     struct userdata *u;
1401
1402     pa_source_output_assert_ref(o);
1403     pa_assert_ctl_context();
1404     pa_assert_se(u = o->userdata);
1405
1406     if (u->dead || u->autoloaded)
1407         return FALSE;
1408
1409     return (u->source != dest) && (u->sink != dest->monitor_of);
1410 }
1411
1412 /* Called from main context */
1413 static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1414     struct userdata *u;
1415
1416     pa_sink_input_assert_ref(i);
1417     pa_assert_se(u = i->userdata);
1418
1419     if (u->dead || u->autoloaded)
1420         return FALSE;
1421
1422     return u->sink != dest;
1423 }
1424
1425 /* Called from main thread */
1426 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1427     struct userdata *u;
1428
1429     pa_source_output_assert_ref(o);
1430     pa_assert_ctl_context();
1431     pa_assert_se(u = o->userdata);
1432
1433     if (dest) {
1434         pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1435         pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1436     } else
1437         pa_source_set_asyncmsgq(u->source, NULL);
1438
1439     if (u->source_auto_desc && dest) {
1440         const char *y, *z;
1441         pa_proplist *pl;
1442
1443         pl = pa_proplist_new();
1444         y = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION);
1445         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1446         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1447                 y ? y : u->sink_input->sink->name);
1448
1449         pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1450         pa_proplist_free(pl);
1451     }
1452 }
1453
1454 /* Called from main context */
1455 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1456     struct userdata *u;
1457
1458     pa_sink_input_assert_ref(i);
1459     pa_assert_se(u = i->userdata);
1460
1461     if (dest) {
1462         pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1463         pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1464     } else
1465         pa_sink_set_asyncmsgq(u->sink, NULL);
1466
1467     if (u->sink_auto_desc && dest) {
1468         const char *y, *z;
1469         pa_proplist *pl;
1470
1471         pl = pa_proplist_new();
1472         y = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION);
1473         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1474         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1475                          y ? y : u->source_output->source->name);
1476
1477         pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1478         pa_proplist_free(pl);
1479     }
1480 }
1481
1482 /* Called from main context */
1483 static void sink_input_volume_changed_cb(pa_sink_input *i) {
1484     struct userdata *u;
1485
1486     pa_sink_input_assert_ref(i);
1487     pa_assert_se(u = i->userdata);
1488
1489     pa_sink_volume_changed(u->sink, &i->volume);
1490 }
1491
1492 /* Called from main context */
1493 static void sink_input_mute_changed_cb(pa_sink_input *i) {
1494     struct userdata *u;
1495
1496     pa_sink_input_assert_ref(i);
1497     pa_assert_se(u = i->userdata);
1498
1499     pa_sink_mute_changed(u->sink, i->muted);
1500 }
1501
1502 /* Called from main context */
1503 static int canceller_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1504     struct pa_echo_canceller_msg *msg;
1505     struct userdata *u;
1506
1507     pa_assert(o);
1508
1509     msg = PA_ECHO_CANCELLER_MSG(o);
1510     u = msg->userdata;
1511
1512     switch (code) {
1513         case ECHO_CANCELLER_MESSAGE_SET_VOLUME: {
1514             pa_cvolume *v = (pa_cvolume *) userdata;
1515
1516             if (u->use_volume_sharing)
1517                 pa_source_set_volume(u->source, v, TRUE, FALSE);
1518             else
1519                 pa_source_output_set_volume(u->source_output, v, FALSE, TRUE);
1520
1521             break;
1522         }
1523
1524         default:
1525             pa_assert_not_reached();
1526             break;
1527     }
1528
1529     return 0;
1530 }
1531
1532 /* Called by the canceller, so thread context */
1533 void pa_echo_canceller_get_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1534     *v = ec->msg->userdata->thread_info.current_volume;
1535 }
1536
1537 /* Called by the canceller, so thread context */
1538 void pa_echo_canceller_set_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1539     if (!pa_cvolume_equal(&ec->msg->userdata->thread_info.current_volume, v)) {
1540         pa_cvolume *vol = pa_xnewdup(pa_cvolume, v, 1);
1541
1542         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(ec->msg), ECHO_CANCELLER_MESSAGE_SET_VOLUME, vol, 0, NULL,
1543                 pa_xfree);
1544     }
1545 }
1546
1547 static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1548 #ifdef HAVE_SPEEX
1549     if (pa_streq(method, "speex"))
1550         return PA_ECHO_CANCELLER_SPEEX;
1551 #endif
1552     if (pa_streq(method, "adrian"))
1553         return PA_ECHO_CANCELLER_ADRIAN;
1554 #ifdef HAVE_WEBRTC
1555     if (pa_streq(method, "webrtc"))
1556         return PA_ECHO_CANCELLER_WEBRTC;
1557 #endif
1558     return PA_ECHO_CANCELLER_INVALID;
1559 }
1560
1561 /* Common initialisation bits between module-echo-cancel and the standalone test program */
1562 static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *source_ss, pa_channel_map *source_map) {
1563     pa_echo_canceller_method_t ec_method;
1564
1565     if (pa_modargs_get_sample_spec_and_channel_map(ma, source_ss, source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1566         pa_log("Invalid sample format specification or channel map");
1567         goto fail;
1568     }
1569
1570     u->ec = pa_xnew0(pa_echo_canceller, 1);
1571     if (!u->ec) {
1572         pa_log("Failed to alloc echo canceller");
1573         goto fail;
1574     }
1575
1576     if ((ec_method = get_ec_method_from_string(pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER))) < 0) {
1577         pa_log("Invalid echo canceller implementation");
1578         goto fail;
1579     }
1580
1581     u->ec->init = ec_table[ec_method].init;
1582     u->ec->play = ec_table[ec_method].play;
1583     u->ec->record = ec_table[ec_method].record;
1584     u->ec->set_drift = ec_table[ec_method].set_drift;
1585     u->ec->run = ec_table[ec_method].run;
1586     u->ec->done = ec_table[ec_method].done;
1587
1588     return 0;
1589
1590 fail:
1591     return -1;
1592 }
1593
1594
1595 int pa__init(pa_module*m) {
1596     struct userdata *u;
1597     pa_sample_spec source_ss, sink_ss;
1598     pa_channel_map source_map, sink_map;
1599     pa_modargs *ma;
1600     pa_source *source_master=NULL;
1601     pa_sink *sink_master=NULL;
1602     pa_source_output_new_data source_output_data;
1603     pa_sink_input_new_data sink_input_data;
1604     pa_source_new_data source_data;
1605     pa_sink_new_data sink_data;
1606     pa_memchunk silence;
1607     uint32_t temp;
1608
1609     pa_assert(m);
1610
1611     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1612         pa_log("Failed to parse module arguments.");
1613         goto fail;
1614     }
1615
1616     if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1617         pa_log("Master source not found");
1618         goto fail;
1619     }
1620     pa_assert(source_master);
1621
1622     if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1623         pa_log("Master sink not found");
1624         goto fail;
1625     }
1626     pa_assert(sink_master);
1627
1628     if (source_master->monitor_of == sink_master) {
1629         pa_log("Can't cancel echo between a sink and its monitor");
1630         goto fail;
1631     }
1632
1633     source_ss = source_master->sample_spec;
1634     source_ss.rate = DEFAULT_RATE;
1635     source_ss.channels = DEFAULT_CHANNELS;
1636     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1637
1638     sink_ss = sink_master->sample_spec;
1639     sink_map = sink_master->channel_map;
1640
1641     u = pa_xnew0(struct userdata, 1);
1642     if (!u) {
1643         pa_log("Failed to alloc userdata");
1644         goto fail;
1645     }
1646     u->core = m->core;
1647     u->module = m;
1648     m->userdata = u;
1649     u->dead = FALSE;
1650
1651     u->use_volume_sharing = TRUE;
1652     if (pa_modargs_get_value_boolean(ma, "use_volume_sharing", &u->use_volume_sharing) < 0) {
1653         pa_log("use_volume_sharing= expects a boolean argument");
1654         goto fail;
1655     }
1656
1657     temp = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1658     if (pa_modargs_get_value_u32(ma, "adjust_time", &temp) < 0) {
1659         pa_log("Failed to parse adjust_time value");
1660         goto fail;
1661     }
1662
1663     if (temp != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1664         u->adjust_time = temp * PA_USEC_PER_SEC;
1665     else
1666         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1667
1668     temp = DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC;
1669     if (pa_modargs_get_value_u32(ma, "adjust_threshold", &temp) < 0) {
1670         pa_log("Failed to parse adjust_threshold value");
1671         goto fail;
1672     }
1673
1674     if (temp != DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC)
1675         u->adjust_threshold = temp * PA_USEC_PER_MSEC;
1676     else
1677         u->adjust_threshold = DEFAULT_ADJUST_TOLERANCE;
1678
1679     u->save_aec = DEFAULT_SAVE_AEC;
1680     if (pa_modargs_get_value_boolean(ma, "save_aec", &u->save_aec) < 0) {
1681         pa_log("Failed to parse save_aec value");
1682         goto fail;
1683     }
1684
1685     u->autoloaded = DEFAULT_AUTOLOADED;
1686     if (pa_modargs_get_value_boolean(ma, "autoloaded", &u->autoloaded) < 0) {
1687         pa_log("Failed to parse autoloaded value");
1688         goto fail;
1689     }
1690
1691     if (init_common(ma, u, &source_ss, &source_map))
1692         goto fail;
1693
1694     u->asyncmsgq = pa_asyncmsgq_new(0);
1695     u->need_realign = TRUE;
1696
1697     if (u->ec->init) {
1698         if (!u->ec->init(u->core, u->ec, &source_ss, &source_map, &sink_ss, &sink_map, &u->blocksize, pa_modargs_get_value(ma, "aec_args", NULL))) {
1699             pa_log("Failed to init AEC engine");
1700             goto fail;
1701         }
1702     }
1703
1704     if (u->ec->params.drift_compensation)
1705         pa_assert(u->ec->set_drift);
1706
1707     /* Create source */
1708     pa_source_new_data_init(&source_data);
1709     source_data.driver = __FILE__;
1710     source_data.module = m;
1711     if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1712         source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1713     pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1714     pa_source_new_data_set_channel_map(&source_data, &source_map);
1715     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1716     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1717     if (!u->autoloaded)
1718         pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1719
1720     if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1721         pa_log("Invalid properties");
1722         pa_source_new_data_done(&source_data);
1723         goto fail;
1724     }
1725
1726     if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1727         const char *y, *z;
1728
1729         y = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1730         z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1731         pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1732                 z ? z : source_master->name, y ? y : sink_master->name);
1733     }
1734
1735     u->source = pa_source_new(m->core, &source_data, (source_master->flags & (PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY))
1736                                                      | (u->use_volume_sharing ? PA_SOURCE_SHARE_VOLUME_WITH_MASTER : 0));
1737     pa_source_new_data_done(&source_data);
1738
1739     if (!u->source) {
1740         pa_log("Failed to create source.");
1741         goto fail;
1742     }
1743
1744     u->source->parent.process_msg = source_process_msg_cb;
1745     u->source->set_state = source_set_state_cb;
1746     u->source->update_requested_latency = source_update_requested_latency_cb;
1747     pa_source_set_get_mute_callback(u->source, source_get_mute_cb);
1748     pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
1749     if (!u->use_volume_sharing) {
1750         pa_source_set_get_volume_callback(u->source, source_get_volume_cb);
1751         pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
1752         pa_source_enable_decibel_volume(u->source, TRUE);
1753     }
1754     u->source->userdata = u;
1755
1756     pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1757
1758     /* Create sink */
1759     pa_sink_new_data_init(&sink_data);
1760     sink_data.driver = __FILE__;
1761     sink_data.module = m;
1762     if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1763         sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1764     pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1765     pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1766     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1767     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1768     if (!u->autoloaded)
1769         pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1770
1771     if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1772         pa_log("Invalid properties");
1773         pa_sink_new_data_done(&sink_data);
1774         goto fail;
1775     }
1776
1777     if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1778         const char *y, *z;
1779
1780         y = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1781         z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1782         pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1783                 z ? z : sink_master->name, y ? y : source_master->name);
1784     }
1785
1786     u->sink = pa_sink_new(m->core, &sink_data, (sink_master->flags & (PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY))
1787                                                | (u->use_volume_sharing ? PA_SINK_SHARE_VOLUME_WITH_MASTER : 0));
1788     pa_sink_new_data_done(&sink_data);
1789
1790     if (!u->sink) {
1791         pa_log("Failed to create sink.");
1792         goto fail;
1793     }
1794
1795     u->sink->parent.process_msg = sink_process_msg_cb;
1796     u->sink->set_state = sink_set_state_cb;
1797     u->sink->update_requested_latency = sink_update_requested_latency_cb;
1798     u->sink->request_rewind = sink_request_rewind_cb;
1799     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
1800     if (!u->use_volume_sharing) {
1801         pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
1802         pa_sink_enable_decibel_volume(u->sink, TRUE);
1803     }
1804     u->sink->userdata = u;
1805
1806     pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1807
1808     /* Create source output */
1809     pa_source_output_new_data_init(&source_output_data);
1810     source_output_data.driver = __FILE__;
1811     source_output_data.module = m;
1812     pa_source_output_new_data_set_source(&source_output_data, source_master, FALSE);
1813     source_output_data.destination_source = u->source;
1814     /* FIXME
1815        source_output_data.flags = PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; */
1816
1817     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1818     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1819     pa_source_output_new_data_set_sample_spec(&source_output_data, &source_ss);
1820     pa_source_output_new_data_set_channel_map(&source_output_data, &source_map);
1821
1822     pa_source_output_new(&u->source_output, m->core, &source_output_data);
1823     pa_source_output_new_data_done(&source_output_data);
1824
1825     if (!u->source_output)
1826         goto fail;
1827
1828     u->source_output->parent.process_msg = source_output_process_msg_cb;
1829     u->source_output->push = source_output_push_cb;
1830     u->source_output->process_rewind = source_output_process_rewind_cb;
1831     u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1832     u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1833     u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1834     u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1835     u->source_output->kill = source_output_kill_cb;
1836     u->source_output->attach = source_output_attach_cb;
1837     u->source_output->detach = source_output_detach_cb;
1838     u->source_output->state_change = source_output_state_change_cb;
1839     u->source_output->may_move_to = source_output_may_move_to_cb;
1840     u->source_output->moving = source_output_moving_cb;
1841     u->source_output->userdata = u;
1842
1843     u->source->output_from_master = u->source_output;
1844
1845     /* Create sink input */
1846     pa_sink_input_new_data_init(&sink_input_data);
1847     sink_input_data.driver = __FILE__;
1848     sink_input_data.module = m;
1849     pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, FALSE);
1850     sink_input_data.origin_sink = u->sink;
1851     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
1852     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1853     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
1854     pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
1855     sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
1856
1857     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1858     pa_sink_input_new_data_done(&sink_input_data);
1859
1860     if (!u->sink_input)
1861         goto fail;
1862
1863     u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1864     u->sink_input->pop = sink_input_pop_cb;
1865     u->sink_input->process_rewind = sink_input_process_rewind_cb;
1866     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1867     u->sink_input->update_max_request = sink_input_update_max_request_cb;
1868     u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
1869     u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1870     u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
1871     u->sink_input->kill = sink_input_kill_cb;
1872     u->sink_input->attach = sink_input_attach_cb;
1873     u->sink_input->detach = sink_input_detach_cb;
1874     u->sink_input->state_change = sink_input_state_change_cb;
1875     u->sink_input->may_move_to = sink_input_may_move_to_cb;
1876     u->sink_input->moving = sink_input_moving_cb;
1877     if (!u->use_volume_sharing)
1878         u->sink_input->volume_changed = sink_input_volume_changed_cb;
1879     u->sink_input->mute_changed = sink_input_mute_changed_cb;
1880     u->sink_input->userdata = u;
1881
1882     u->sink->input_to_master = u->sink_input;
1883
1884     pa_sink_input_get_silence(u->sink_input, &silence);
1885
1886     u->source_memblockq = pa_memblockq_new("module-echo-cancel source_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1887         &source_ss, 1, 1, 0, &silence);
1888     u->sink_memblockq = pa_memblockq_new("module-echo-cancel sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1889         &sink_ss, 1, 1, 0, &silence);
1890
1891     pa_memblock_unref(silence.memblock);
1892
1893     if (!u->source_memblockq || !u->sink_memblockq) {
1894         pa_log("Failed to create memblockq.");
1895         goto fail;
1896     }
1897
1898     if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
1899         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1900     else if (u->ec->params.drift_compensation) {
1901         pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
1902         u->adjust_time = 0;
1903         /* Perform resync just once to give the canceller a leg up */
1904         pa_atomic_store(&u->request_resync, 1);
1905     }
1906
1907     if (u->save_aec) {
1908         pa_log("Creating AEC files in /tmp");
1909         u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
1910         if (u->captured_file == NULL)
1911             perror ("fopen failed");
1912         u->played_file = fopen("/tmp/aec_play.sw", "wb");
1913         if (u->played_file == NULL)
1914             perror ("fopen failed");
1915         u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
1916         if (u->canceled_file == NULL)
1917             perror ("fopen failed");
1918         if (u->ec->params.drift_compensation) {
1919             u->drift_file = fopen("/tmp/aec_drift.txt", "w");
1920             if (u->drift_file == NULL)
1921                 perror ("fopen failed");
1922         }
1923     }
1924
1925     u->ec->msg = pa_msgobject_new(pa_echo_canceller_msg);
1926     u->ec->msg->parent.process_msg = canceller_process_msg_cb;
1927     u->ec->msg->userdata = u;
1928
1929     u->thread_info.current_volume = u->source->reference_volume;
1930
1931     pa_sink_put(u->sink);
1932     pa_source_put(u->source);
1933
1934     pa_sink_input_put(u->sink_input);
1935     pa_source_output_put(u->source_output);
1936     pa_modargs_free(ma);
1937
1938     return 0;
1939
1940 fail:
1941     if (ma)
1942         pa_modargs_free(ma);
1943
1944     pa__done(m);
1945
1946     return -1;
1947 }
1948
1949 int pa__get_n_used(pa_module *m) {
1950     struct userdata *u;
1951
1952     pa_assert(m);
1953     pa_assert_se(u = m->userdata);
1954
1955     return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
1956 }
1957
1958 void pa__done(pa_module*m) {
1959     struct userdata *u;
1960
1961     pa_assert(m);
1962
1963     if (!(u = m->userdata))
1964         return;
1965
1966     u->dead = TRUE;
1967
1968     /* See comments in source_output_kill_cb() above regarding
1969      * destruction order! */
1970
1971     if (u->time_event)
1972         u->core->mainloop->time_free(u->time_event);
1973
1974     if (u->source_output)
1975         pa_source_output_unlink(u->source_output);
1976     if (u->sink_input)
1977         pa_sink_input_unlink(u->sink_input);
1978
1979     if (u->source)
1980         pa_source_unlink(u->source);
1981     if (u->sink)
1982         pa_sink_unlink(u->sink);
1983
1984     if (u->source_output)
1985         pa_source_output_unref(u->source_output);
1986     if (u->sink_input)
1987         pa_sink_input_unref(u->sink_input);
1988
1989     if (u->source)
1990         pa_source_unref(u->source);
1991     if (u->sink)
1992         pa_sink_unref(u->sink);
1993
1994     if (u->source_memblockq)
1995         pa_memblockq_free(u->source_memblockq);
1996     if (u->sink_memblockq)
1997         pa_memblockq_free(u->sink_memblockq);
1998
1999     if (u->ec) {
2000         if (u->ec->done)
2001             u->ec->done(u->ec);
2002
2003         pa_xfree(u->ec);
2004     }
2005
2006     if (u->asyncmsgq)
2007         pa_asyncmsgq_unref(u->asyncmsgq);
2008
2009     if (u->save_aec) {
2010         if (u->played_file)
2011             fclose(u->played_file);
2012         if (u->captured_file)
2013             fclose(u->captured_file);
2014         if (u->canceled_file)
2015             fclose(u->canceled_file);
2016         if (u->drift_file)
2017             fclose(u->drift_file);
2018     }
2019
2020     pa_xfree(u);
2021 }
2022
2023 #ifdef ECHO_CANCEL_TEST
2024 /*
2025  * Stand-alone test program for running in the canceller on pre-recorded files.
2026  */
2027 int main(int argc, char* argv[]) {
2028     struct userdata u;
2029     pa_sample_spec source_ss, sink_ss;
2030     pa_channel_map source_map, sink_map;
2031     pa_modargs *ma = NULL;
2032     uint8_t *rdata = NULL, *pdata = NULL, *cdata = NULL;
2033     int unused PA_GCC_UNUSED;
2034     int ret = 0, i;
2035     char c;
2036     float drift;
2037
2038     pa_memzero(&u, sizeof(u));
2039
2040     if (argc < 4 || argc > 7) {
2041         goto usage;
2042     }
2043
2044     u.captured_file = fopen(argv[2], "r");
2045     if (u.captured_file == NULL) {
2046         perror ("fopen failed");
2047         goto fail;
2048     }
2049     u.played_file = fopen(argv[1], "r");
2050     if (u.played_file == NULL) {
2051         perror ("fopen failed");
2052         goto fail;
2053     }
2054     u.canceled_file = fopen(argv[3], "wb");
2055     if (u.canceled_file == NULL) {
2056         perror ("fopen failed");
2057         goto fail;
2058     }
2059
2060     u.core = pa_xnew0(pa_core, 1);
2061     u.core->cpu_info.cpu_type = PA_CPU_X86;
2062     u.core->cpu_info.flags.x86 |= PA_CPU_X86_SSE;
2063
2064     if (!(ma = pa_modargs_new(argc > 4 ? argv[4] : NULL, valid_modargs))) {
2065         pa_log("Failed to parse module arguments.");
2066         goto fail;
2067     }
2068
2069     source_ss.format = PA_SAMPLE_S16LE;
2070     source_ss.rate = DEFAULT_RATE;
2071     source_ss.channels = DEFAULT_CHANNELS;
2072     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2073
2074     init_common(ma, &u, &source_ss, &source_map);
2075
2076     if (!u.ec->init(u.core, u.ec, &source_ss, &source_map, &sink_ss, &sink_map, &u.blocksize,
2077                      (argc > 4) ? argv[5] : NULL )) {
2078         pa_log("Failed to init AEC engine");
2079         goto fail;
2080     }
2081
2082     if (u.ec->params.drift_compensation) {
2083         if (argc < 7) {
2084             pa_log("Drift compensation enabled but drift file not specified");
2085             goto fail;
2086         }
2087
2088         u.drift_file = fopen(argv[6], "r");
2089
2090         if (u.drift_file == NULL) {
2091             perror ("fopen failed");
2092             goto fail;
2093         }
2094     }
2095
2096     rdata = pa_xmalloc(u.blocksize);
2097     pdata = pa_xmalloc(u.blocksize);
2098     cdata = pa_xmalloc(u.blocksize);
2099
2100     if (!u.ec->params.drift_compensation) {
2101         while (fread(rdata, u.blocksize, 1, u.captured_file) > 0) {
2102             if (fread(pdata, u.blocksize, 1, u.played_file) == 0) {
2103                 perror("Played file ended before captured file");
2104                 goto fail;
2105             }
2106
2107             u.ec->run(u.ec, rdata, pdata, cdata);
2108
2109             unused = fwrite(cdata, u.blocksize, 1, u.canceled_file);
2110         }
2111     } else {
2112         while (fscanf(u.drift_file, "%c", &c) > 0) {
2113             switch (c) {
2114                 case 'd':
2115                     if (!fscanf(u.drift_file, "%a", &drift)) {
2116                         perror("Drift file incomplete");
2117                         goto fail;
2118                     }
2119
2120                     u.ec->set_drift(u.ec, drift);
2121
2122                     break;
2123
2124                 case 'c':
2125                     if (!fscanf(u.drift_file, "%d", &i)) {
2126                         perror("Drift file incomplete");
2127                         goto fail;
2128                     }
2129
2130                     if (fread(rdata, i, 1, u.captured_file) <= 0) {
2131                         perror("Captured file ended prematurely");
2132                         goto fail;
2133                     }
2134
2135                     u.ec->record(u.ec, rdata, cdata);
2136
2137                     unused = fwrite(cdata, i, 1, u.canceled_file);
2138
2139                     break;
2140
2141                 case 'p':
2142                     if (!fscanf(u.drift_file, "%d", &i)) {
2143                         perror("Drift file incomplete");
2144                         goto fail;
2145                     }
2146
2147                     if (fread(pdata, i, 1, u.played_file) <= 0) {
2148                         perror("Played file ended prematurely");
2149                         goto fail;
2150                     }
2151
2152                     u.ec->play(u.ec, pdata);
2153
2154                     break;
2155             }
2156         }
2157
2158         if (fread(rdata, i, 1, u.captured_file) > 0)
2159             pa_log("All capture data was not consumed");
2160         if (fread(pdata, i, 1, u.played_file) > 0)
2161             pa_log("All playback data was not consumed");
2162     }
2163
2164     u.ec->done(u.ec);
2165
2166     fclose(u.captured_file);
2167     fclose(u.played_file);
2168     fclose(u.canceled_file);
2169     if (u.drift_file)
2170         fclose(u.drift_file);
2171
2172 out:
2173     pa_xfree(rdata);
2174     pa_xfree(pdata);
2175     pa_xfree(cdata);
2176
2177     pa_xfree(u.ec);
2178     pa_xfree(u.core);
2179
2180     if (ma)
2181         pa_modargs_free(ma);
2182
2183     return ret;
2184
2185 usage:
2186     pa_log("Usage: %s play_file rec_file out_file [module args] [aec_args] [drift_file]", argv[0]);
2187
2188 fail:
2189     ret = -1;
2190     goto out;
2191 }
2192 #endif /* ECHO_CANCEL_TEST */