echo-cancel: Make WebRTC the default canceller
[profile/ivi/pulseaudio.git] / src / modules / echo-cancel / module-echo-cancel.c
1 /***
2     This file is part of PulseAudio.
3
4     Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6     Based on module-virtual-sink.c
7              module-virtual-source.c
8              module-loopback.c
9
10         Copyright 2010 Intel Corporation
11         Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13     PulseAudio is free software; you can redistribute it and/or modify
14     it under the terms of the GNU Lesser General Public License as published
15     by the Free Software Foundation; either version 2.1 of the License,
16     or (at your option) any later version.
17
18     PulseAudio is distributed in the hope that it will be useful, but
19     WITHOUT ANY WARRANTY; without even the implied warranty of
20     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21     General Public License for more details.
22
23     You should have received a copy of the GNU Lesser General Public License
24     along with PulseAudio; if not, write to the Free Software
25     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26     USA.
27 ***/
28
29 #ifdef HAVE_CONFIG_H
30 #include <config.h>
31 #endif
32
33 #include <stdio.h>
34 #include <math.h>
35
36 #include "echo-cancel.h"
37
38 #include <pulse/xmalloc.h>
39 #include <pulse/timeval.h>
40 #include <pulse/rtclock.h>
41
42 #include <pulsecore/i18n.h>
43 #include <pulsecore/atomic.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/namereg.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-rtclock.h>
49 #include <pulsecore/core-util.h>
50 #include <pulsecore/modargs.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/rtpoll.h>
53 #include <pulsecore/sample-util.h>
54 #include <pulsecore/ltdl-helper.h>
55
56 #include "module-echo-cancel-symdef.h"
57
58 PA_MODULE_AUTHOR("Wim Taymans");
59 PA_MODULE_DESCRIPTION("Echo Cancellation");
60 PA_MODULE_VERSION(PACKAGE_VERSION);
61 PA_MODULE_LOAD_ONCE(FALSE);
62 PA_MODULE_USAGE(
63         _("source_name=<name for the source> "
64           "source_properties=<properties for the source> "
65           "source_master=<name of source to filter> "
66           "sink_name=<name for the sink> "
67           "sink_properties=<properties for the sink> "
68           "sink_master=<name of sink to filter> "
69           "adjust_time=<how often to readjust rates in s> "
70           "adjust_threshold=<how much drift to readjust after in ms> "
71           "format=<sample format> "
72           "rate=<sample rate> "
73           "channels=<number of channels> "
74           "channel_map=<channel map> "
75           "aec_method=<implementation to use> "
76           "aec_args=<parameters for the AEC engine> "
77           "save_aec=<save AEC data in /tmp> "
78           "autoloaded=<set if this module is being loaded automatically> "
79           "use_volume_sharing=<yes or no> "
80         ));
81
82 /* NOTE: Make sure the enum and ec_table are maintained in the correct order */
83 typedef enum {
84     PA_ECHO_CANCELLER_INVALID = -1,
85     PA_ECHO_CANCELLER_SPEEX = 0,
86     PA_ECHO_CANCELLER_ADRIAN,
87 #ifdef HAVE_WEBRTC
88     PA_ECHO_CANCELLER_WEBRTC,
89 #endif
90 } pa_echo_canceller_method_t;
91
92 #define DEFAULT_ECHO_CANCELLER "webrtc"
93
94 static const pa_echo_canceller ec_table[] = {
95     {
96         /* Speex */
97         .init                   = pa_speex_ec_init,
98         .run                    = pa_speex_ec_run,
99         .done                   = pa_speex_ec_done,
100     },
101     {
102         /* Adrian Andre's NLMS implementation */
103         .init                   = pa_adrian_ec_init,
104         .run                    = pa_adrian_ec_run,
105         .done                   = pa_adrian_ec_done,
106     },
107 #ifdef HAVE_WEBRTC
108     {
109         /* WebRTC's audio processing engine */
110         .init                   = pa_webrtc_ec_init,
111         .play                   = pa_webrtc_ec_play,
112         .record                 = pa_webrtc_ec_record,
113         .set_drift              = pa_webrtc_ec_set_drift,
114         .run                    = pa_webrtc_ec_run,
115         .done                   = pa_webrtc_ec_done,
116     },
117 #endif
118 };
119
120 #define DEFAULT_RATE 32000
121 #define DEFAULT_CHANNELS 1
122 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
123 #define DEFAULT_ADJUST_TOLERANCE (5*PA_USEC_PER_MSEC)
124 #define DEFAULT_SAVE_AEC FALSE
125 #define DEFAULT_AUTOLOADED FALSE
126
127 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
128
129 /* Can only be used in main context */
130 #define IS_ACTIVE(u) ((pa_source_get_state((u)->source) == PA_SOURCE_RUNNING) && \
131                       (pa_sink_get_state((u)->sink) == PA_SINK_RUNNING))
132
133 /* This module creates a new (virtual) source and sink.
134  *
135  * The data sent to the new sink is kept in a memblockq before being
136  * forwarded to the real sink_master.
137  *
138  * Data read from source_master is matched against the saved sink data and
139  * echo canceled data is then pushed onto the new source.
140  *
141  * Both source and sink masters have their own threads to push/pull data
142  * respectively. We however perform all our actions in the source IO thread.
143  * To do this we send all played samples to the source IO thread where they
144  * are then pushed into the memblockq.
145  *
146  * Alignment is performed in two steps:
147  *
148  * 1) when something happens that requires quick adjustment of the alignment of
149  *    capture and playback samples, we perform a resync. This adjusts the
150  *    position in the playback memblock to the requested sample. Quick
151  *    adjustments include moving the playback samples before the capture
152  *    samples (because else the echo canceler does not work) or when the
153  *    playback pointer drifts too far away.
154  *
155  * 2) periodically check the difference between capture and playback. we use a
156  *    low and high watermark for adjusting the alignment. playback should always
157  *    be before capture and the difference should not be bigger than one frame
158  *    size. We would ideally like to resample the sink_input but most driver
159  *    don't give enough accuracy to be able to do that right now.
160  */
161
162 struct userdata;
163
164 struct pa_echo_canceller_msg {
165     pa_msgobject parent;
166     struct userdata *userdata;
167 };
168
169 PA_DEFINE_PRIVATE_CLASS(pa_echo_canceller_msg, pa_msgobject);
170 #define PA_ECHO_CANCELLER_MSG(o) (pa_echo_canceller_msg_cast(o))
171
172 struct snapshot {
173     pa_usec_t sink_now;
174     pa_usec_t sink_latency;
175     size_t sink_delay;
176     int64_t send_counter;
177
178     pa_usec_t source_now;
179     pa_usec_t source_latency;
180     size_t source_delay;
181     int64_t recv_counter;
182     size_t rlen;
183     size_t plen;
184 };
185
186 struct userdata {
187     pa_core *core;
188     pa_module *module;
189
190     pa_bool_t autoloaded;
191     pa_bool_t dead;
192     pa_bool_t save_aec;
193
194     pa_echo_canceller *ec;
195     uint32_t blocksize;
196
197     pa_bool_t need_realign;
198
199     /* to wakeup the source I/O thread */
200     pa_asyncmsgq *asyncmsgq;
201     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
202
203     pa_source *source;
204     pa_bool_t source_auto_desc;
205     pa_source_output *source_output;
206     pa_memblockq *source_memblockq; /* echo canceler needs fixed sized chunks */
207     size_t source_skip;
208
209     pa_sink *sink;
210     pa_bool_t sink_auto_desc;
211     pa_sink_input *sink_input;
212     pa_memblockq *sink_memblockq;
213     int64_t send_counter;          /* updated in sink IO thread */
214     int64_t recv_counter;
215     size_t sink_skip;
216
217     /* Bytes left over from previous iteration */
218     size_t sink_rem;
219     size_t source_rem;
220
221     pa_atomic_t request_resync;
222
223     pa_time_event *time_event;
224     pa_usec_t adjust_time;
225     int adjust_threshold;
226
227     FILE *captured_file;
228     FILE *played_file;
229     FILE *canceled_file;
230     FILE *drift_file;
231
232     pa_bool_t use_volume_sharing;
233
234     struct {
235         pa_cvolume current_volume;
236     } thread_info;
237 };
238
239 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
240
241 static const char* const valid_modargs[] = {
242     "source_name",
243     "source_properties",
244     "source_master",
245     "sink_name",
246     "sink_properties",
247     "sink_master",
248     "adjust_time",
249     "adjust_threshold",
250     "format",
251     "rate",
252     "channels",
253     "channel_map",
254     "aec_method",
255     "aec_args",
256     "save_aec",
257     "autoloaded",
258     "use_volume_sharing",
259     NULL
260 };
261
262 enum {
263     SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
264     SOURCE_OUTPUT_MESSAGE_REWIND,
265     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
266     SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
267 };
268
269 enum {
270     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
271 };
272
273 enum {
274     ECHO_CANCELLER_MESSAGE_SET_VOLUME,
275 };
276
277 static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
278     int64_t buffer, diff_time, buffer_latency;
279
280     /* get the number of samples between capture and playback */
281     if (snapshot->plen > snapshot->rlen)
282         buffer = snapshot->plen - snapshot->rlen;
283     else
284         buffer = 0;
285
286     buffer += snapshot->source_delay + snapshot->sink_delay;
287
288     /* add the amount of samples not yet transferred to the source context */
289     if (snapshot->recv_counter <= snapshot->send_counter)
290         buffer += (int64_t) (snapshot->send_counter - snapshot->recv_counter);
291     else
292         buffer += PA_CLIP_SUB(buffer, (int64_t) (snapshot->recv_counter - snapshot->send_counter));
293
294     /* convert to time */
295     buffer_latency = pa_bytes_to_usec(buffer, &u->source_output->sample_spec);
296
297     /* capture and playback samples are perfectly aligned when diff_time is 0 */
298     diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
299           (snapshot->source_now - snapshot->source_latency);
300
301     pa_log_debug("diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
302         (long long) snapshot->sink_latency,
303         (long long) buffer_latency, (long long) snapshot->source_latency,
304         (long long) snapshot->source_delay, (long long) snapshot->sink_delay,
305         (long long) (snapshot->send_counter - snapshot->recv_counter),
306         (long long) (snapshot->sink_now - snapshot->source_now));
307
308     return diff_time;
309 }
310
311 /* Called from main context */
312 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
313     struct userdata *u = userdata;
314     uint32_t old_rate, base_rate, new_rate;
315     int64_t diff_time;
316     /*size_t fs*/
317     struct snapshot latency_snapshot;
318
319     pa_assert(u);
320     pa_assert(a);
321     pa_assert(u->time_event == e);
322     pa_assert_ctl_context();
323
324     if (!IS_ACTIVE(u))
325         return;
326
327     /* update our snapshots */
328     pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
329     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
330
331     /* calculate drift between capture and playback */
332     diff_time = calc_diff(u, &latency_snapshot);
333
334     /*fs = pa_frame_size(&u->source_output->sample_spec);*/
335     old_rate = u->sink_input->sample_spec.rate;
336     base_rate = u->source_output->sample_spec.rate;
337
338     if (diff_time < 0) {
339         /* recording before playback, we need to adjust quickly. The echo
340          * canceler does not work in this case. */
341         pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
342             NULL, diff_time, NULL, NULL);
343         /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
344         new_rate = base_rate;
345     }
346     else {
347         if (diff_time > u->adjust_threshold) {
348             /* diff too big, quickly adjust */
349             pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
350                 NULL, diff_time, NULL, NULL);
351         }
352
353         /* recording behind playback, we need to slowly adjust the rate to match */
354         /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
355
356         /* assume equal samplerates for now */
357         new_rate = base_rate;
358     }
359
360     /* make sure we don't make too big adjustments because that sounds horrible */
361     if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
362         new_rate = base_rate;
363
364     if (new_rate != old_rate) {
365         pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
366
367         pa_sink_input_set_rate(u->sink_input, new_rate);
368     }
369
370     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
371 }
372
373 /* Called from source I/O thread context */
374 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
375     struct userdata *u = PA_SOURCE(o)->userdata;
376
377     switch (code) {
378
379         case PA_SOURCE_MESSAGE_GET_LATENCY:
380
381             /* The source is _put() before the source output is, so let's
382              * make sure we don't access it in that time. Also, the
383              * source output is first shut down, the source second. */
384             if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
385                 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
386                 *((pa_usec_t*) data) = 0;
387                 return 0;
388             }
389
390             *((pa_usec_t*) data) =
391
392                 /* Get the latency of the master source */
393                 pa_source_get_latency_within_thread(u->source_output->source) +
394                 /* Add the latency internal to our source output on top */
395                 pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
396                 /* and the buffering we do on the source */
397                 pa_bytes_to_usec(u->blocksize, &u->source_output->source->sample_spec);
398
399             return 0;
400
401         case PA_SOURCE_MESSAGE_SET_VOLUME_SYNCED:
402             u->thread_info.current_volume = u->source->reference_volume;
403             break;
404     }
405
406     return pa_source_process_msg(o, code, data, offset, chunk);
407 }
408
409 /* Called from sink I/O thread context */
410 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
411     struct userdata *u = PA_SINK(o)->userdata;
412
413     switch (code) {
414
415         case PA_SINK_MESSAGE_GET_LATENCY:
416
417             /* The sink is _put() before the sink input is, so let's
418              * make sure we don't access it in that time. Also, the
419              * sink input is first shut down, the sink second. */
420             if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
421                 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
422                 *((pa_usec_t*) data) = 0;
423                 return 0;
424             }
425
426             *((pa_usec_t*) data) =
427
428                 /* Get the latency of the master sink */
429                 pa_sink_get_latency_within_thread(u->sink_input->sink) +
430
431                 /* Add the latency internal to our sink input on top */
432                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
433
434             return 0;
435     }
436
437     return pa_sink_process_msg(o, code, data, offset, chunk);
438 }
439
440
441 /* Called from main context */
442 static int source_set_state_cb(pa_source *s, pa_source_state_t state) {
443     struct userdata *u;
444
445     pa_source_assert_ref(s);
446     pa_assert_se(u = s->userdata);
447
448     if (!PA_SOURCE_IS_LINKED(state) ||
449         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
450         return 0;
451
452     if (state == PA_SOURCE_RUNNING) {
453         /* restart timer when both sink and source are active */
454         if (IS_ACTIVE(u) && u->adjust_time)
455             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
456
457         pa_atomic_store(&u->request_resync, 1);
458         pa_source_output_cork(u->source_output, FALSE);
459     } else if (state == PA_SOURCE_SUSPENDED) {
460         pa_source_output_cork(u->source_output, TRUE);
461     }
462
463     return 0;
464 }
465
466 /* Called from main context */
467 static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state) {
468     struct userdata *u;
469
470     pa_sink_assert_ref(s);
471     pa_assert_se(u = s->userdata);
472
473     if (!PA_SINK_IS_LINKED(state) ||
474         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
475         return 0;
476
477     if (state == PA_SINK_RUNNING) {
478         /* restart timer when both sink and source are active */
479         if (IS_ACTIVE(u) && u->adjust_time)
480             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
481
482         pa_atomic_store(&u->request_resync, 1);
483         pa_sink_input_cork(u->sink_input, FALSE);
484     } else if (state == PA_SINK_SUSPENDED) {
485         pa_sink_input_cork(u->sink_input, TRUE);
486     }
487
488     return 0;
489 }
490
491 /* Called from I/O thread context */
492 static void source_update_requested_latency_cb(pa_source *s) {
493     struct userdata *u;
494
495     pa_source_assert_ref(s);
496     pa_assert_se(u = s->userdata);
497
498     if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
499         !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
500         return;
501
502     pa_log_debug("Source update requested latency");
503
504     /* Just hand this one over to the master source */
505     pa_source_output_set_requested_latency_within_thread(
506             u->source_output,
507             pa_source_get_requested_latency_within_thread(s));
508 }
509
510 /* Called from I/O thread context */
511 static void sink_update_requested_latency_cb(pa_sink *s) {
512     struct userdata *u;
513
514     pa_sink_assert_ref(s);
515     pa_assert_se(u = s->userdata);
516
517     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
518         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
519         return;
520
521     pa_log_debug("Sink update requested latency");
522
523     /* Just hand this one over to the master sink */
524     pa_sink_input_set_requested_latency_within_thread(
525             u->sink_input,
526             pa_sink_get_requested_latency_within_thread(s));
527 }
528
529 /* Called from I/O thread context */
530 static void sink_request_rewind_cb(pa_sink *s) {
531     struct userdata *u;
532
533     pa_sink_assert_ref(s);
534     pa_assert_se(u = s->userdata);
535
536     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
537         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
538         return;
539
540     pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
541
542     /* Just hand this one over to the master sink */
543     pa_sink_input_request_rewind(u->sink_input,
544                                  s->thread_info.rewind_nbytes, TRUE, FALSE, FALSE);
545 }
546
547 /* Called from main context */
548 static void source_set_volume_cb(pa_source *s) {
549     struct userdata *u;
550
551     pa_source_assert_ref(s);
552     pa_assert_se(u = s->userdata);
553
554     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
555         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
556         return;
557
558     pa_source_output_set_volume(u->source_output, &s->real_volume, s->save_volume, TRUE);
559 }
560
561 /* Called from main context */
562 static void sink_set_volume_cb(pa_sink *s) {
563     struct userdata *u;
564
565     pa_sink_assert_ref(s);
566     pa_assert_se(u = s->userdata);
567
568     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
569         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
570         return;
571
572     pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, TRUE);
573 }
574
575 static void source_get_volume_cb(pa_source *s) {
576     struct userdata *u;
577     pa_cvolume v;
578
579     pa_source_assert_ref(s);
580     pa_assert_se(u = s->userdata);
581
582     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
583         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
584         return;
585
586     pa_source_output_get_volume(u->source_output, &v, TRUE);
587
588     if (pa_cvolume_equal(&s->real_volume, &v))
589         /* no change */
590         return;
591
592     s->real_volume = v;
593     pa_source_set_soft_volume(s, NULL);
594 }
595
596 /* Called from main context */
597 static void source_set_mute_cb(pa_source *s) {
598     struct userdata *u;
599
600     pa_source_assert_ref(s);
601     pa_assert_se(u = s->userdata);
602
603     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
604         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
605         return;
606
607     pa_source_output_set_mute(u->source_output, s->muted, s->save_muted);
608 }
609
610 /* Called from main context */
611 static void sink_set_mute_cb(pa_sink *s) {
612     struct userdata *u;
613
614     pa_sink_assert_ref(s);
615     pa_assert_se(u = s->userdata);
616
617     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
618         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
619         return;
620
621     pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
622 }
623
624 /* Called from main context */
625 static void source_get_mute_cb(pa_source *s) {
626     struct userdata *u;
627
628     pa_source_assert_ref(s);
629     pa_assert_se(u = s->userdata);
630
631     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
632         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
633         return;
634
635     pa_source_output_get_mute(u->source_output);
636 }
637
638 /* must be called from the input thread context */
639 static void apply_diff_time(struct userdata *u, int64_t diff_time) {
640     int64_t diff;
641
642     if (diff_time < 0) {
643         diff = pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec);
644
645         if (diff > 0) {
646             /* add some extra safety samples to compensate for jitter in the
647              * timings */
648             diff += 10 * pa_frame_size (&u->source_output->sample_spec);
649
650             pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
651
652             u->sink_skip = diff;
653             u->source_skip = 0;
654         }
655     } else if (diff_time > 0) {
656         diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
657
658         if (diff > 0) {
659             pa_log("playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
660
661             u->source_skip = diff;
662             u->sink_skip = 0;
663         }
664     }
665 }
666
667 /* must be called from the input thread */
668 static void do_resync(struct userdata *u) {
669     int64_t diff_time;
670     struct snapshot latency_snapshot;
671
672     pa_log("Doing resync");
673
674     /* update our snapshot */
675     source_output_snapshot_within_thread(u, &latency_snapshot);
676     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
677
678     /* calculate drift between capture and playback */
679     diff_time = calc_diff(u, &latency_snapshot);
680
681     /* and adjust for the drift */
682     apply_diff_time(u, diff_time);
683 }
684
685 /* 1. Calculate drift at this point, pass to canceller
686  * 2. Push out playback samples in blocksize chunks
687  * 3. Push out capture samples in blocksize chunks
688  * 4. ???
689  * 5. Profit
690  */
691 static void do_push_drift_comp(struct userdata *u) {
692     size_t rlen, plen;
693     pa_memchunk rchunk, pchunk, cchunk;
694     uint8_t *rdata, *pdata, *cdata;
695     float drift;
696     int unused;
697
698     rlen = pa_memblockq_get_length(u->source_memblockq);
699     plen = pa_memblockq_get_length(u->sink_memblockq);
700
701     /* Estimate snapshot drift as follows:
702      *   pd: amount of data consumed since last time
703      *   rd: amount of data consumed since last time
704      *
705      *   drift = (pd - rd) / rd;
706      *
707      * We calculate pd and rd as the memblockq length less the number of
708      * samples left from the last iteration (to avoid double counting
709      * those remainder samples.
710      */
711     drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
712     u->sink_rem = plen % u->blocksize;
713     u->source_rem = rlen % u->blocksize;
714
715     /* Now let the canceller work its drift compensation magic */
716     u->ec->set_drift(u->ec, drift);
717
718     if (u->save_aec) {
719         if (u->drift_file)
720             fprintf(u->drift_file, "d %a\n", drift);
721     }
722
723     /* Send in the playback samples first */
724     while (plen >= u->blocksize) {
725         pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
726         pdata = pa_memblock_acquire(pchunk.memblock);
727         pdata += pchunk.index;
728
729         u->ec->play(u->ec, pdata);
730
731         if (u->save_aec) {
732             if (u->drift_file)
733                 fprintf(u->drift_file, "p %d\n", u->blocksize);
734             if (u->played_file)
735                 unused = fwrite(pdata, 1, u->blocksize, u->played_file);
736         }
737
738         pa_memblock_release(pchunk.memblock);
739         pa_memblockq_drop(u->sink_memblockq, u->blocksize);
740         pa_memblock_unref(pchunk.memblock);
741
742         plen -= u->blocksize;
743     }
744
745     /* And now the capture samples */
746     while (rlen >= u->blocksize) {
747         pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
748
749         rdata = pa_memblock_acquire(rchunk.memblock);
750         rdata += rchunk.index;
751
752         cchunk.index = 0;
753         cchunk.length = u->blocksize;
754         cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
755         cdata = pa_memblock_acquire(cchunk.memblock);
756
757         u->ec->record(u->ec, rdata, cdata);
758
759         if (u->save_aec) {
760             if (u->drift_file)
761                 fprintf(u->drift_file, "c %d\n", u->blocksize);
762             if (u->captured_file)
763                 unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
764             if (u->canceled_file)
765                 unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
766         }
767
768         pa_memblock_release(cchunk.memblock);
769         pa_memblock_release(rchunk.memblock);
770
771         pa_memblock_unref(rchunk.memblock);
772
773         pa_source_post(u->source, &cchunk);
774         pa_memblock_unref(cchunk.memblock);
775
776         pa_memblockq_drop(u->source_memblockq, u->blocksize);
777         rlen -= u->blocksize;
778     }
779 }
780
781 /* This one's simpler than the drift compensation case -- we just iterate over
782  * the capture buffer, and pass the canceller blocksize bytes of playback and
783  * capture data. */
784 static void do_push(struct userdata *u) {
785     size_t rlen, plen;
786     pa_memchunk rchunk, pchunk, cchunk;
787     uint8_t *rdata, *pdata, *cdata;
788     int unused;
789
790     rlen = pa_memblockq_get_length(u->source_memblockq);
791     plen = pa_memblockq_get_length(u->sink_memblockq);
792
793     while (rlen >= u->blocksize) {
794         /* take fixed block from recorded samples */
795         pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
796
797         if (plen > u->blocksize) {
798             if (plen > u->blocksize) {
799                 /* take fixed block from played samples */
800                 pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
801
802                 rdata = pa_memblock_acquire(rchunk.memblock);
803                 rdata += rchunk.index;
804                 pdata = pa_memblock_acquire(pchunk.memblock);
805                 pdata += pchunk.index;
806
807                 cchunk.index = 0;
808                 cchunk.length = u->blocksize;
809                 cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
810                 cdata = pa_memblock_acquire(cchunk.memblock);
811
812                 if (u->save_aec) {
813                     if (u->captured_file)
814                         unused = fwrite(rdata, 1, u->blocksize, u->captured_file);
815                     if (u->played_file)
816                         unused = fwrite(pdata, 1, u->blocksize, u->played_file);
817                 }
818
819                 /* perform echo cancellation */
820                 u->ec->run(u->ec, rdata, pdata, cdata);
821
822                 if (u->save_aec) {
823                     if (u->canceled_file)
824                         unused = fwrite(cdata, 1, u->blocksize, u->canceled_file);
825                 }
826
827                 pa_memblock_release(cchunk.memblock);
828                 pa_memblock_release(pchunk.memblock);
829                 pa_memblock_release(rchunk.memblock);
830
831                 /* drop consumed sink samples */
832                 pa_memblockq_drop(u->sink_memblockq, u->blocksize);
833                 pa_memblock_unref(pchunk.memblock);
834
835                 pa_memblock_unref(rchunk.memblock);
836                 /* the filtered samples now become the samples from our
837                  * source */
838                 rchunk = cchunk;
839
840                 plen -= u->blocksize;
841             }
842         }
843
844         /* forward the (echo-canceled) data to the virtual source */
845         pa_source_post(u->source, &rchunk);
846         pa_memblock_unref(rchunk.memblock);
847
848         pa_memblockq_drop(u->source_memblockq, u->blocksize);
849         rlen -= u->blocksize;
850     }
851 }
852
853 /* Called from input thread context */
854 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
855     struct userdata *u;
856     size_t rlen, plen, to_skip;
857     pa_memchunk rchunk;
858
859     pa_source_output_assert_ref(o);
860     pa_source_output_assert_io_context(o);
861     pa_assert_se(u = o->userdata);
862
863     if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
864         pa_log("push when no link?");
865         return;
866     }
867
868     if (PA_UNLIKELY(u->source->thread_info.state != PA_SOURCE_RUNNING ||
869                     u->sink->thread_info.state != PA_SINK_RUNNING)) {
870         pa_source_post(u->source, chunk);
871         return;
872     }
873
874     /* handle queued messages, do any message sending of our own */
875     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
876         ;
877
878     pa_memblockq_push_align(u->source_memblockq, chunk);
879
880     rlen = pa_memblockq_get_length(u->source_memblockq);
881     plen = pa_memblockq_get_length(u->sink_memblockq);
882
883     /* Let's not do anything else till we have enough data to process */
884     if (rlen < u->blocksize)
885         return;
886
887     /* See if we need to drop samples in order to sync */
888     if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
889         do_resync(u);
890     }
891
892     /* Okay, skip cancellation for skipped source samples if needed. */
893     if (PA_UNLIKELY(u->source_skip)) {
894         /* The slightly tricky bit here is that we drop all but modulo
895          * blocksize bytes and then adjust for that last bit on the sink side.
896          * We do this because the source data is coming at a fixed rate, which
897          * means the only way to try to catch up is drop sink samples and let
898          * the canceller cope up with this. */
899         to_skip = rlen >= u->source_skip ? u->source_skip : rlen;
900         to_skip -= to_skip % u->blocksize;
901
902         if (to_skip) {
903             pa_memblockq_peek_fixed_size(u->source_memblockq, to_skip, &rchunk);
904             pa_source_post(u->source, &rchunk);
905
906             pa_memblock_unref(rchunk.memblock);
907             pa_memblockq_drop(u->source_memblockq, u->blocksize);
908
909             rlen -= to_skip;
910             u->source_skip -= to_skip;
911         }
912
913         if (rlen && u->source_skip % u->blocksize) {
914             u->sink_skip += u->blocksize - (u->source_skip % u->blocksize);
915             u->source_skip -= (u->source_skip % u->blocksize);
916         }
917     }
918
919     /* And for the sink, these samples have been played back already, so we can
920      * just drop them and get on with it. */
921     if (PA_UNLIKELY(u->sink_skip)) {
922         to_skip = plen >= u->sink_skip ? u->sink_skip : plen;
923
924         pa_memblockq_drop(u->sink_memblockq, to_skip);
925
926         plen -= to_skip;
927         u->sink_skip -= to_skip;
928     }
929
930     /* process and push out samples */
931     if (u->ec->params.drift_compensation)
932         do_push_drift_comp(u);
933     else
934         do_push(u);
935 }
936
937 /* Called from I/O thread context */
938 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
939     struct userdata *u;
940
941     pa_sink_input_assert_ref(i);
942     pa_assert(chunk);
943     pa_assert_se(u = i->userdata);
944
945     if (u->sink->thread_info.rewind_requested)
946         pa_sink_process_rewind(u->sink, 0);
947
948     pa_sink_render_full(u->sink, nbytes, chunk);
949
950     if (i->thread_info.underrun_for > 0) {
951         pa_log_debug("Handling end of underrun.");
952         pa_atomic_store(&u->request_resync, 1);
953     }
954
955     /* let source thread handle the chunk. pass the sample count as well so that
956      * the source IO thread can update the right variables. */
957     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
958         NULL, 0, chunk, NULL);
959     u->send_counter += chunk->length;
960
961     return 0;
962 }
963
964 /* Called from input thread context */
965 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
966     struct userdata *u;
967
968     pa_source_output_assert_ref(o);
969     pa_source_output_assert_io_context(o);
970     pa_assert_se(u = o->userdata);
971
972     pa_source_process_rewind(u->source, nbytes);
973
974     /* go back on read side, we need to use older sink data for this */
975     pa_memblockq_rewind(u->sink_memblockq, nbytes);
976
977     /* manipulate write index */
978     pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, TRUE);
979
980     pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
981         (long long) pa_memblockq_get_length (u->source_memblockq));
982 }
983
984 /* Called from I/O thread context */
985 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
986     struct userdata *u;
987
988     pa_sink_input_assert_ref(i);
989     pa_assert_se(u = i->userdata);
990
991     pa_log_debug("Sink process rewind %lld", (long long) nbytes);
992
993     pa_sink_process_rewind(u->sink, nbytes);
994
995     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
996     u->send_counter -= nbytes;
997 }
998
999 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
1000     size_t delay, rlen, plen;
1001     pa_usec_t now, latency;
1002
1003     now = pa_rtclock_now();
1004     latency = pa_source_get_latency_within_thread(u->source_output->source);
1005     delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
1006
1007     delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
1008     rlen = pa_memblockq_get_length(u->source_memblockq);
1009     plen = pa_memblockq_get_length(u->sink_memblockq);
1010
1011     snapshot->source_now = now;
1012     snapshot->source_latency = latency;
1013     snapshot->source_delay = delay;
1014     snapshot->recv_counter = u->recv_counter;
1015     snapshot->rlen = rlen + u->sink_skip;
1016     snapshot->plen = plen + u->source_skip;
1017 }
1018
1019
1020 /* Called from output thread context */
1021 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1022     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
1023
1024     switch (code) {
1025
1026         case SOURCE_OUTPUT_MESSAGE_POST:
1027
1028             pa_source_output_assert_io_context(u->source_output);
1029
1030             if (u->source_output->source->thread_info.state == PA_SOURCE_RUNNING)
1031                 pa_memblockq_push_align(u->sink_memblockq, chunk);
1032             else
1033                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1034
1035             u->recv_counter += (int64_t) chunk->length;
1036
1037             return 0;
1038
1039         case SOURCE_OUTPUT_MESSAGE_REWIND:
1040             pa_source_output_assert_io_context(u->source_output);
1041
1042             /* manipulate write index, never go past what we have */
1043             if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
1044                 pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, TRUE);
1045             else
1046                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1047
1048             pa_log_debug("Sink rewind (%lld)", (long long) offset);
1049
1050             u->recv_counter -= offset;
1051
1052             return 0;
1053
1054         case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
1055             struct snapshot *snapshot = (struct snapshot *) data;
1056
1057             source_output_snapshot_within_thread(u, snapshot);
1058             return 0;
1059         }
1060
1061         case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
1062             apply_diff_time(u, offset);
1063             return 0;
1064
1065     }
1066
1067     return pa_source_output_process_msg(obj, code, data, offset, chunk);
1068 }
1069
1070 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1071     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1072
1073     switch (code) {
1074
1075         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1076             size_t delay;
1077             pa_usec_t now, latency;
1078             struct snapshot *snapshot = (struct snapshot *) data;
1079
1080             pa_sink_input_assert_io_context(u->sink_input);
1081
1082             now = pa_rtclock_now();
1083             latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
1084             delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1085
1086             delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
1087
1088             snapshot->sink_now = now;
1089             snapshot->sink_latency = latency;
1090             snapshot->sink_delay = delay;
1091             snapshot->send_counter = u->send_counter;
1092             return 0;
1093         }
1094     }
1095
1096     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1097 }
1098
1099 /* Called from I/O thread context */
1100 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1101     struct userdata *u;
1102
1103     pa_sink_input_assert_ref(i);
1104     pa_assert_se(u = i->userdata);
1105
1106     pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
1107
1108     pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
1109     pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
1110 }
1111
1112 /* Called from I/O thread context */
1113 static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
1114     struct userdata *u;
1115
1116     pa_source_output_assert_ref(o);
1117     pa_assert_se(u = o->userdata);
1118
1119     pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
1120
1121     pa_source_set_max_rewind_within_thread(u->source, nbytes);
1122 }
1123
1124 /* Called from I/O thread context */
1125 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1126     struct userdata *u;
1127
1128     pa_sink_input_assert_ref(i);
1129     pa_assert_se(u = i->userdata);
1130
1131     pa_log_debug("Sink input update max request %lld", (long long) nbytes);
1132
1133     pa_sink_set_max_request_within_thread(u->sink, nbytes);
1134 }
1135
1136 /* Called from I/O thread context */
1137 static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
1138     struct userdata *u;
1139     pa_usec_t latency;
1140
1141     pa_sink_input_assert_ref(i);
1142     pa_assert_se(u = i->userdata);
1143
1144     latency = pa_sink_get_requested_latency_within_thread(i->sink);
1145
1146     pa_log_debug("Sink input update requested latency %lld", (long long) latency);
1147 }
1148
1149 /* Called from I/O thread context */
1150 static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
1151     struct userdata *u;
1152     pa_usec_t latency;
1153
1154     pa_source_output_assert_ref(o);
1155     pa_assert_se(u = o->userdata);
1156
1157     latency = pa_source_get_requested_latency_within_thread(o->source);
1158
1159     pa_log_debug("source output update requested latency %lld", (long long) latency);
1160 }
1161
1162 /* Called from I/O thread context */
1163 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
1164     struct userdata *u;
1165
1166     pa_sink_input_assert_ref(i);
1167     pa_assert_se(u = i->userdata);
1168
1169     pa_log_debug("Sink input update latency range %lld %lld",
1170         (long long) i->sink->thread_info.min_latency,
1171         (long long) i->sink->thread_info.max_latency);
1172
1173     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1174 }
1175
1176 /* Called from I/O thread context */
1177 static void source_output_update_source_latency_range_cb(pa_source_output *o) {
1178     struct userdata *u;
1179
1180     pa_source_output_assert_ref(o);
1181     pa_assert_se(u = o->userdata);
1182
1183     pa_log_debug("Source output update latency range %lld %lld",
1184         (long long) o->source->thread_info.min_latency,
1185         (long long) o->source->thread_info.max_latency);
1186
1187     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1188 }
1189
1190 /* Called from I/O thread context */
1191 static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1192     struct userdata *u;
1193
1194     pa_sink_input_assert_ref(i);
1195     pa_assert_se(u = i->userdata);
1196
1197     pa_log_debug("Sink input update fixed latency %lld",
1198         (long long) i->sink->thread_info.fixed_latency);
1199
1200     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1201 }
1202
1203 /* Called from I/O thread context */
1204 static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1205     struct userdata *u;
1206
1207     pa_source_output_assert_ref(o);
1208     pa_assert_se(u = o->userdata);
1209
1210     pa_log_debug("Source output update fixed latency %lld",
1211         (long long) o->source->thread_info.fixed_latency);
1212
1213     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1214 }
1215
1216 /* Called from output thread context */
1217 static void source_output_attach_cb(pa_source_output *o) {
1218     struct userdata *u;
1219
1220     pa_source_output_assert_ref(o);
1221     pa_source_output_assert_io_context(o);
1222     pa_assert_se(u = o->userdata);
1223
1224     pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1225     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1226     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1227     pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1228
1229     pa_log_debug("Source output %d attach", o->index);
1230
1231     pa_source_attach_within_thread(u->source);
1232
1233     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1234             o->source->thread_info.rtpoll,
1235             PA_RTPOLL_LATE,
1236             u->asyncmsgq);
1237 }
1238
1239 /* Called from I/O thread context */
1240 static void sink_input_attach_cb(pa_sink_input *i) {
1241     struct userdata *u;
1242
1243     pa_sink_input_assert_ref(i);
1244     pa_assert_se(u = i->userdata);
1245
1246     pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1247     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1248
1249     /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1250      * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1251     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1252
1253     /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1254      * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1255      * HERE. SEE (6) */
1256     pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1257     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1258
1259     pa_log_debug("Sink input %d attach", i->index);
1260
1261     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1262             i->sink->thread_info.rtpoll,
1263             PA_RTPOLL_LATE,
1264             u->asyncmsgq);
1265
1266     pa_sink_attach_within_thread(u->sink);
1267 }
1268
1269
1270 /* Called from output thread context */
1271 static void source_output_detach_cb(pa_source_output *o) {
1272     struct userdata *u;
1273
1274     pa_source_output_assert_ref(o);
1275     pa_source_output_assert_io_context(o);
1276     pa_assert_se(u = o->userdata);
1277
1278     pa_source_detach_within_thread(u->source);
1279     pa_source_set_rtpoll(u->source, NULL);
1280
1281     pa_log_debug("Source output %d detach", o->index);
1282
1283     if (u->rtpoll_item_read) {
1284         pa_rtpoll_item_free(u->rtpoll_item_read);
1285         u->rtpoll_item_read = NULL;
1286     }
1287 }
1288
1289 /* Called from I/O thread context */
1290 static void sink_input_detach_cb(pa_sink_input *i) {
1291     struct userdata *u;
1292
1293     pa_sink_input_assert_ref(i);
1294     pa_assert_se(u = i->userdata);
1295
1296     pa_sink_detach_within_thread(u->sink);
1297
1298     pa_sink_set_rtpoll(u->sink, NULL);
1299
1300     pa_log_debug("Sink input %d detach", i->index);
1301
1302     if (u->rtpoll_item_write) {
1303         pa_rtpoll_item_free(u->rtpoll_item_write);
1304         u->rtpoll_item_write = NULL;
1305     }
1306 }
1307
1308 /* Called from output thread context */
1309 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1310     struct userdata *u;
1311
1312     pa_source_output_assert_ref(o);
1313     pa_source_output_assert_io_context(o);
1314     pa_assert_se(u = o->userdata);
1315
1316     pa_log_debug("Source output %d state %d", o->index, state);
1317 }
1318
1319 /* Called from IO thread context */
1320 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1321     struct userdata *u;
1322
1323     pa_sink_input_assert_ref(i);
1324     pa_assert_se(u = i->userdata);
1325
1326     pa_log_debug("Sink input %d state %d", i->index, state);
1327
1328     /* If we are added for the first time, ask for a rewinding so that
1329      * we are heard right-away. */
1330     if (PA_SINK_INPUT_IS_LINKED(state) &&
1331         i->thread_info.state == PA_SINK_INPUT_INIT) {
1332         pa_log_debug("Requesting rewind due to state change.");
1333         pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
1334     }
1335 }
1336
1337 /* Called from main thread */
1338 static void source_output_kill_cb(pa_source_output *o) {
1339     struct userdata *u;
1340
1341     pa_source_output_assert_ref(o);
1342     pa_assert_ctl_context();
1343     pa_assert_se(u = o->userdata);
1344
1345     u->dead = TRUE;
1346
1347     /* The order here matters! We first kill the source output, followed
1348      * by the source. That means the source callbacks must be protected
1349      * against an unconnected source output! */
1350     pa_source_output_unlink(u->source_output);
1351     pa_source_unlink(u->source);
1352
1353     pa_source_output_unref(u->source_output);
1354     u->source_output = NULL;
1355
1356     pa_source_unref(u->source);
1357     u->source = NULL;
1358
1359     pa_log_debug("Source output kill %d", o->index);
1360
1361     pa_module_unload_request(u->module, TRUE);
1362 }
1363
1364 /* Called from main context */
1365 static void sink_input_kill_cb(pa_sink_input *i) {
1366     struct userdata *u;
1367
1368     pa_sink_input_assert_ref(i);
1369     pa_assert_se(u = i->userdata);
1370
1371     u->dead = TRUE;
1372
1373     /* The order here matters! We first kill the sink input, followed
1374      * by the sink. That means the sink callbacks must be protected
1375      * against an unconnected sink input! */
1376     pa_sink_input_unlink(u->sink_input);
1377     pa_sink_unlink(u->sink);
1378
1379     pa_sink_input_unref(u->sink_input);
1380     u->sink_input = NULL;
1381
1382     pa_sink_unref(u->sink);
1383     u->sink = NULL;
1384
1385     pa_log_debug("Sink input kill %d", i->index);
1386
1387     pa_module_unload_request(u->module, TRUE);
1388 }
1389
1390 /* Called from main thread */
1391 static pa_bool_t source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1392     struct userdata *u;
1393
1394     pa_source_output_assert_ref(o);
1395     pa_assert_ctl_context();
1396     pa_assert_se(u = o->userdata);
1397
1398     if (u->dead)
1399         return FALSE;
1400
1401     return (u->source != dest) && (u->sink != dest->monitor_of);
1402 }
1403
1404 /* Called from main context */
1405 static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1406     struct userdata *u;
1407
1408     pa_sink_input_assert_ref(i);
1409     pa_assert_se(u = i->userdata);
1410
1411     if (u->dead)
1412         return FALSE;
1413
1414     return u->sink != dest;
1415 }
1416
1417 /* Called from main thread */
1418 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1419     struct userdata *u;
1420
1421     pa_source_output_assert_ref(o);
1422     pa_assert_ctl_context();
1423     pa_assert_se(u = o->userdata);
1424
1425     if (dest) {
1426         pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1427         pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1428     } else
1429         pa_source_set_asyncmsgq(u->source, NULL);
1430
1431     if (u->source_auto_desc && dest) {
1432         const char *z;
1433         pa_proplist *pl;
1434
1435         pl = pa_proplist_new();
1436         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1437         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "Echo-Cancel Source %s on %s",
1438                          pa_proplist_gets(u->source->proplist, "device.echo-cancel.name"), z ? z : dest->name);
1439
1440         pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1441         pa_proplist_free(pl);
1442     }
1443 }
1444
1445 /* Called from main context */
1446 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1447     struct userdata *u;
1448
1449     pa_sink_input_assert_ref(i);
1450     pa_assert_se(u = i->userdata);
1451
1452     if (dest) {
1453         pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1454         pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1455     } else
1456         pa_sink_set_asyncmsgq(u->sink, NULL);
1457
1458     if (u->sink_auto_desc && dest) {
1459         const char *z;
1460         pa_proplist *pl;
1461
1462         pl = pa_proplist_new();
1463         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1464         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "Echo-Cancel Sink %s on %s",
1465                          pa_proplist_gets(u->sink->proplist, "device.echo-cancel.name"), z ? z : dest->name);
1466
1467         pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1468         pa_proplist_free(pl);
1469     }
1470 }
1471
1472 /* Called from main context */
1473 static void sink_input_volume_changed_cb(pa_sink_input *i) {
1474     struct userdata *u;
1475
1476     pa_sink_input_assert_ref(i);
1477     pa_assert_se(u = i->userdata);
1478
1479     pa_sink_volume_changed(u->sink, &i->volume);
1480 }
1481
1482 /* Called from main context */
1483 static void sink_input_mute_changed_cb(pa_sink_input *i) {
1484     struct userdata *u;
1485
1486     pa_sink_input_assert_ref(i);
1487     pa_assert_se(u = i->userdata);
1488
1489     pa_sink_mute_changed(u->sink, i->muted);
1490 }
1491
1492 /* Called from main context */
1493 static int canceller_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1494     struct pa_echo_canceller_msg *msg;
1495     struct userdata *u;
1496
1497     pa_assert(o);
1498
1499     msg = PA_ECHO_CANCELLER_MSG(o);
1500     u = msg->userdata;
1501
1502     switch (code) {
1503         case ECHO_CANCELLER_MESSAGE_SET_VOLUME: {
1504             pa_cvolume *v = (pa_cvolume *) userdata;
1505
1506             if (u->use_volume_sharing)
1507                 pa_source_set_volume(u->source, v, TRUE, FALSE);
1508             else
1509                 pa_source_output_set_volume(u->source_output, v, FALSE, TRUE);
1510
1511             break;
1512         }
1513
1514         default:
1515             pa_assert_not_reached();
1516             break;
1517     }
1518
1519     return 0;
1520 }
1521
1522 /* Called by the canceller, so thread context */
1523 void pa_echo_canceller_get_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1524     *v = ec->msg->userdata->thread_info.current_volume;
1525 }
1526
1527 /* Called by the canceller, so thread context */
1528 void pa_echo_canceller_set_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1529     if (!pa_cvolume_equal(&ec->msg->userdata->thread_info.current_volume, v)) {
1530         pa_cvolume *vol = pa_xnewdup(pa_cvolume, v, 1);
1531
1532         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(ec->msg), ECHO_CANCELLER_MESSAGE_SET_VOLUME, vol, 0, NULL,
1533                 pa_xfree);
1534     }
1535 }
1536
1537 static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1538     if (pa_streq(method, "speex"))
1539         return PA_ECHO_CANCELLER_SPEEX;
1540     else if (pa_streq(method, "adrian"))
1541         return PA_ECHO_CANCELLER_ADRIAN;
1542 #ifdef HAVE_WEBRTC
1543     else if (pa_streq(method, "webrtc"))
1544         return PA_ECHO_CANCELLER_WEBRTC;
1545 #endif
1546     else
1547         return PA_ECHO_CANCELLER_INVALID;
1548 }
1549
1550 /* Common initialisation bits between module-echo-cancel and the standalone test program */
1551 static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *source_ss, pa_channel_map *source_map) {
1552     pa_echo_canceller_method_t ec_method;
1553
1554     if (pa_modargs_get_sample_spec_and_channel_map(ma, source_ss, source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1555         pa_log("Invalid sample format specification or channel map");
1556         goto fail;
1557     }
1558
1559     u->ec = pa_xnew0(pa_echo_canceller, 1);
1560     if (!u->ec) {
1561         pa_log("Failed to alloc echo canceller");
1562         goto fail;
1563     }
1564
1565     if ((ec_method = get_ec_method_from_string(pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER))) < 0) {
1566         pa_log("Invalid echo canceller implementation");
1567         goto fail;
1568     }
1569
1570     u->ec->init = ec_table[ec_method].init;
1571     u->ec->play = ec_table[ec_method].play;
1572     u->ec->record = ec_table[ec_method].record;
1573     u->ec->set_drift = ec_table[ec_method].set_drift;
1574     u->ec->run = ec_table[ec_method].run;
1575     u->ec->done = ec_table[ec_method].done;
1576
1577     return 0;
1578
1579 fail:
1580     return -1;
1581 }
1582
1583
1584 int pa__init(pa_module*m) {
1585     struct userdata *u;
1586     pa_sample_spec source_ss, sink_ss;
1587     pa_channel_map source_map, sink_map;
1588     pa_modargs *ma;
1589     pa_source *source_master=NULL;
1590     pa_sink *sink_master=NULL;
1591     pa_source_output_new_data source_output_data;
1592     pa_sink_input_new_data sink_input_data;
1593     pa_source_new_data source_data;
1594     pa_sink_new_data sink_data;
1595     pa_memchunk silence;
1596     uint32_t temp;
1597
1598     pa_assert(m);
1599
1600     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1601         pa_log("Failed to parse module arguments.");
1602         goto fail;
1603     }
1604
1605     if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1606         pa_log("Master source not found");
1607         goto fail;
1608     }
1609     pa_assert(source_master);
1610
1611     if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1612         pa_log("Master sink not found");
1613         goto fail;
1614     }
1615     pa_assert(sink_master);
1616
1617     if (source_master->monitor_of == sink_master) {
1618         pa_log("Can't cancel echo between a sink and its monitor");
1619         goto fail;
1620     }
1621
1622     source_ss = source_master->sample_spec;
1623     source_ss.rate = DEFAULT_RATE;
1624     source_ss.channels = DEFAULT_CHANNELS;
1625     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1626
1627     sink_ss = sink_master->sample_spec;
1628     sink_map = sink_master->channel_map;
1629
1630     u = pa_xnew0(struct userdata, 1);
1631     if (!u) {
1632         pa_log("Failed to alloc userdata");
1633         goto fail;
1634     }
1635     u->core = m->core;
1636     u->module = m;
1637     m->userdata = u;
1638     u->dead = FALSE;
1639
1640     u->use_volume_sharing = TRUE;
1641     if (pa_modargs_get_value_boolean(ma, "use_volume_sharing", &u->use_volume_sharing) < 0) {
1642         pa_log("use_volume_sharing= expects a boolean argument");
1643         goto fail;
1644     }
1645
1646     temp = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1647     if (pa_modargs_get_value_u32(ma, "adjust_time", &temp) < 0) {
1648         pa_log("Failed to parse adjust_time value");
1649         goto fail;
1650     }
1651
1652     if (temp != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1653         u->adjust_time = temp * PA_USEC_PER_SEC;
1654     else
1655         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1656
1657     temp = DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC;
1658     if (pa_modargs_get_value_u32(ma, "adjust_threshold", &temp) < 0) {
1659         pa_log("Failed to parse adjust_threshold value");
1660         goto fail;
1661     }
1662
1663     if (temp != DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC)
1664         u->adjust_threshold = temp * PA_USEC_PER_MSEC;
1665     else
1666         u->adjust_threshold = DEFAULT_ADJUST_TOLERANCE;
1667
1668     u->save_aec = DEFAULT_SAVE_AEC;
1669     if (pa_modargs_get_value_boolean(ma, "save_aec", &u->save_aec) < 0) {
1670         pa_log("Failed to parse save_aec value");
1671         goto fail;
1672     }
1673
1674     u->autoloaded = DEFAULT_AUTOLOADED;
1675     if (pa_modargs_get_value_boolean(ma, "autoloaded", &u->autoloaded) < 0) {
1676         pa_log("Failed to parse autoloaded value");
1677         goto fail;
1678     }
1679
1680     if (init_common(ma, u, &source_ss, &source_map))
1681         goto fail;
1682
1683     u->asyncmsgq = pa_asyncmsgq_new(0);
1684     u->need_realign = TRUE;
1685
1686     if (u->ec->init) {
1687         if (!u->ec->init(u->core, u->ec, &source_ss, &source_map, &sink_ss, &sink_map, &u->blocksize, pa_modargs_get_value(ma, "aec_args", NULL))) {
1688             pa_log("Failed to init AEC engine");
1689             goto fail;
1690         }
1691     }
1692
1693     if (u->ec->params.drift_compensation)
1694         pa_assert(u->ec->set_drift);
1695
1696     /* Create source */
1697     pa_source_new_data_init(&source_data);
1698     source_data.driver = __FILE__;
1699     source_data.module = m;
1700     if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1701         source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1702     pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1703     pa_source_new_data_set_channel_map(&source_data, &source_map);
1704     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1705     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1706     if (!u->autoloaded)
1707         pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1708     pa_proplist_sets(source_data.proplist, "device.echo-cancel.name", source_data.name);
1709
1710     if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1711         pa_log("Invalid properties");
1712         pa_source_new_data_done(&source_data);
1713         goto fail;
1714     }
1715
1716     if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1717         const char *z;
1718
1719         z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1720         pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Echo-Cancel Source %s on %s", source_data.name, z ? z : source_master->name);
1721     }
1722
1723     u->source = pa_source_new(m->core, &source_data, (source_master->flags & (PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY))
1724                                                      | (u->use_volume_sharing ? PA_SOURCE_SHARE_VOLUME_WITH_MASTER : 0));
1725     pa_source_new_data_done(&source_data);
1726
1727     if (!u->source) {
1728         pa_log("Failed to create source.");
1729         goto fail;
1730     }
1731
1732     u->source->parent.process_msg = source_process_msg_cb;
1733     u->source->set_state = source_set_state_cb;
1734     u->source->update_requested_latency = source_update_requested_latency_cb;
1735     pa_source_set_get_mute_callback(u->source, source_get_mute_cb);
1736     pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
1737     if (!u->use_volume_sharing) {
1738         pa_source_set_get_volume_callback(u->source, source_get_volume_cb);
1739         pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
1740         pa_source_enable_decibel_volume(u->source, TRUE);
1741     }
1742     u->source->userdata = u;
1743
1744     pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1745
1746     /* Create sink */
1747     pa_sink_new_data_init(&sink_data);
1748     sink_data.driver = __FILE__;
1749     sink_data.module = m;
1750     if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1751         sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1752     pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1753     pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1754     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1755     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1756     if (!u->autoloaded)
1757         pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1758     pa_proplist_sets(sink_data.proplist, "device.echo-cancel.name", sink_data.name);
1759
1760     if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1761         pa_log("Invalid properties");
1762         pa_sink_new_data_done(&sink_data);
1763         goto fail;
1764     }
1765
1766     if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1767         const char *z;
1768
1769         z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1770         pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Echo-Cancel Sink %s on %s", sink_data.name, z ? z : sink_master->name);
1771     }
1772
1773     u->sink = pa_sink_new(m->core, &sink_data, (sink_master->flags & (PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY))
1774                                                | (u->use_volume_sharing ? PA_SINK_SHARE_VOLUME_WITH_MASTER : 0));
1775     pa_sink_new_data_done(&sink_data);
1776
1777     if (!u->sink) {
1778         pa_log("Failed to create sink.");
1779         goto fail;
1780     }
1781
1782     u->sink->parent.process_msg = sink_process_msg_cb;
1783     u->sink->set_state = sink_set_state_cb;
1784     u->sink->update_requested_latency = sink_update_requested_latency_cb;
1785     u->sink->request_rewind = sink_request_rewind_cb;
1786     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
1787     if (!u->use_volume_sharing) {
1788         pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
1789         pa_sink_enable_decibel_volume(u->sink, TRUE);
1790     }
1791     u->sink->userdata = u;
1792
1793     pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1794
1795     /* Create source output */
1796     pa_source_output_new_data_init(&source_output_data);
1797     source_output_data.driver = __FILE__;
1798     source_output_data.module = m;
1799     pa_source_output_new_data_set_source(&source_output_data, source_master, FALSE);
1800     source_output_data.destination_source = u->source;
1801     /* FIXME
1802        source_output_data.flags = PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; */
1803
1804     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1805     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1806     pa_source_output_new_data_set_sample_spec(&source_output_data, &source_ss);
1807     pa_source_output_new_data_set_channel_map(&source_output_data, &source_map);
1808
1809     pa_source_output_new(&u->source_output, m->core, &source_output_data);
1810     pa_source_output_new_data_done(&source_output_data);
1811
1812     if (!u->source_output)
1813         goto fail;
1814
1815     u->source_output->parent.process_msg = source_output_process_msg_cb;
1816     u->source_output->push = source_output_push_cb;
1817     u->source_output->process_rewind = source_output_process_rewind_cb;
1818     u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1819     u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1820     u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1821     u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1822     u->source_output->kill = source_output_kill_cb;
1823     u->source_output->attach = source_output_attach_cb;
1824     u->source_output->detach = source_output_detach_cb;
1825     u->source_output->state_change = source_output_state_change_cb;
1826     u->source_output->may_move_to = source_output_may_move_to_cb;
1827     u->source_output->moving = source_output_moving_cb;
1828     u->source_output->userdata = u;
1829
1830     u->source->output_from_master = u->source_output;
1831
1832     /* Create sink input */
1833     pa_sink_input_new_data_init(&sink_input_data);
1834     sink_input_data.driver = __FILE__;
1835     sink_input_data.module = m;
1836     pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, FALSE);
1837     sink_input_data.origin_sink = u->sink;
1838     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
1839     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1840     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
1841     pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
1842     sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
1843
1844     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1845     pa_sink_input_new_data_done(&sink_input_data);
1846
1847     if (!u->sink_input)
1848         goto fail;
1849
1850     u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1851     u->sink_input->pop = sink_input_pop_cb;
1852     u->sink_input->process_rewind = sink_input_process_rewind_cb;
1853     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1854     u->sink_input->update_max_request = sink_input_update_max_request_cb;
1855     u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
1856     u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1857     u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
1858     u->sink_input->kill = sink_input_kill_cb;
1859     u->sink_input->attach = sink_input_attach_cb;
1860     u->sink_input->detach = sink_input_detach_cb;
1861     u->sink_input->state_change = sink_input_state_change_cb;
1862     u->sink_input->may_move_to = sink_input_may_move_to_cb;
1863     u->sink_input->moving = sink_input_moving_cb;
1864     if (!u->use_volume_sharing)
1865         u->sink_input->volume_changed = sink_input_volume_changed_cb;
1866     u->sink_input->mute_changed = sink_input_mute_changed_cb;
1867     u->sink_input->userdata = u;
1868
1869     u->sink->input_to_master = u->sink_input;
1870
1871     pa_sink_input_get_silence(u->sink_input, &silence);
1872
1873     u->source_memblockq = pa_memblockq_new("module-echo-cancel source_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1874         &source_ss, 1, 1, 0, &silence);
1875     u->sink_memblockq = pa_memblockq_new("module-echo-cancel sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1876         &sink_ss, 1, 1, 0, &silence);
1877
1878     pa_memblock_unref(silence.memblock);
1879
1880     if (!u->source_memblockq || !u->sink_memblockq) {
1881         pa_log("Failed to create memblockq.");
1882         goto fail;
1883     }
1884
1885     if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
1886         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1887     else if (u->ec->params.drift_compensation) {
1888         pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
1889         u->adjust_time = 0;
1890         /* Perform resync just once to give the canceller a leg up */
1891         pa_atomic_store(&u->request_resync, 1);
1892     }
1893
1894     if (u->save_aec) {
1895         pa_log("Creating AEC files in /tmp");
1896         u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
1897         if (u->captured_file == NULL)
1898             perror ("fopen failed");
1899         u->played_file = fopen("/tmp/aec_play.sw", "wb");
1900         if (u->played_file == NULL)
1901             perror ("fopen failed");
1902         u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
1903         if (u->canceled_file == NULL)
1904             perror ("fopen failed");
1905         if (u->ec->params.drift_compensation) {
1906             u->drift_file = fopen("/tmp/aec_drift.txt", "w");
1907             if (u->drift_file == NULL)
1908                 perror ("fopen failed");
1909         }
1910     }
1911
1912     u->ec->msg = pa_msgobject_new(pa_echo_canceller_msg);
1913     u->ec->msg->parent.process_msg = canceller_process_msg_cb;
1914     u->ec->msg->userdata = u;
1915
1916     u->thread_info.current_volume = u->source->reference_volume;
1917
1918     pa_sink_put(u->sink);
1919     pa_source_put(u->source);
1920
1921     pa_sink_input_put(u->sink_input);
1922     pa_source_output_put(u->source_output);
1923     pa_modargs_free(ma);
1924
1925     return 0;
1926
1927 fail:
1928     if (ma)
1929         pa_modargs_free(ma);
1930
1931     pa__done(m);
1932
1933     return -1;
1934 }
1935
1936 int pa__get_n_used(pa_module *m) {
1937     struct userdata *u;
1938
1939     pa_assert(m);
1940     pa_assert_se(u = m->userdata);
1941
1942     return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
1943 }
1944
1945 void pa__done(pa_module*m) {
1946     struct userdata *u;
1947
1948     pa_assert(m);
1949
1950     if (!(u = m->userdata))
1951         return;
1952
1953     u->dead = TRUE;
1954
1955     /* See comments in source_output_kill_cb() above regarding
1956      * destruction order! */
1957
1958     if (u->time_event)
1959         u->core->mainloop->time_free(u->time_event);
1960
1961     if (u->source_output)
1962         pa_source_output_unlink(u->source_output);
1963     if (u->sink_input)
1964         pa_sink_input_unlink(u->sink_input);
1965
1966     if (u->source)
1967         pa_source_unlink(u->source);
1968     if (u->sink)
1969         pa_sink_unlink(u->sink);
1970
1971     if (u->source_output)
1972         pa_source_output_unref(u->source_output);
1973     if (u->sink_input)
1974         pa_sink_input_unref(u->sink_input);
1975
1976     if (u->source)
1977         pa_source_unref(u->source);
1978     if (u->sink)
1979         pa_sink_unref(u->sink);
1980
1981     if (u->source_memblockq)
1982         pa_memblockq_free(u->source_memblockq);
1983     if (u->sink_memblockq)
1984         pa_memblockq_free(u->sink_memblockq);
1985
1986     if (u->ec) {
1987         if (u->ec->done)
1988             u->ec->done(u->ec);
1989
1990         pa_xfree(u->ec);
1991     }
1992
1993     if (u->asyncmsgq)
1994         pa_asyncmsgq_unref(u->asyncmsgq);
1995
1996     if (u->save_aec) {
1997         if (u->played_file)
1998             fclose(u->played_file);
1999         if (u->captured_file)
2000             fclose(u->captured_file);
2001         if (u->canceled_file)
2002             fclose(u->canceled_file);
2003         if (u->drift_file)
2004             fclose(u->drift_file);
2005     }
2006
2007     pa_xfree(u);
2008 }
2009
2010 #ifdef ECHO_CANCEL_TEST
2011 /*
2012  * Stand-alone test program for running in the canceller on pre-recorded files.
2013  */
2014 int main(int argc, char* argv[]) {
2015     struct userdata u;
2016     pa_sample_spec source_ss, sink_ss;
2017     pa_channel_map source_map, sink_map;
2018     pa_modargs *ma = NULL;
2019     uint8_t *rdata = NULL, *pdata = NULL, *cdata = NULL;
2020     int ret = 0, unused, i;
2021     char c;
2022     float drift;
2023
2024     pa_memzero(&u, sizeof(u));
2025
2026     if (argc < 4 || argc > 7) {
2027         goto usage;
2028     }
2029
2030     u.ec = pa_xnew0(pa_echo_canceller, 1);
2031     if (!u.ec) {
2032         pa_log("Failed to alloc echo canceller");
2033         goto fail;
2034     }
2035
2036     u.captured_file = fopen(argv[2], "r");
2037     if (u.captured_file == NULL) {
2038         perror ("fopen failed");
2039         goto fail;
2040     }
2041     u.played_file = fopen(argv[1], "r");
2042     if (u.played_file == NULL) {
2043         perror ("fopen failed");
2044         goto fail;
2045     }
2046     u.canceled_file = fopen(argv[3], "wb");
2047     if (u.canceled_file == NULL) {
2048         perror ("fopen failed");
2049         goto fail;
2050     }
2051
2052     u.core = pa_xnew0(pa_core, 1);
2053     u.core->cpu_info.cpu_type = PA_CPU_X86;
2054     u.core->cpu_info.flags.x86 |= PA_CPU_X86_SSE;
2055
2056     if (!(ma = pa_modargs_new(argc > 4 ? argv[4] : NULL, valid_modargs))) {
2057         pa_log("Failed to parse module arguments.");
2058         goto fail;
2059     }
2060
2061     source_ss.format = PA_SAMPLE_S16LE;
2062     source_ss.rate = DEFAULT_RATE;
2063     source_ss.channels = DEFAULT_CHANNELS;
2064     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2065
2066     init_common(ma, &u, &source_ss, &source_map);
2067
2068     if (!u.ec->init(u.core, u.ec, &source_ss, &source_map, &sink_ss, &sink_map, &u.blocksize,
2069                      (argc > 4) ? argv[5] : NULL )) {
2070         pa_log("Failed to init AEC engine");
2071         goto fail;
2072     }
2073
2074     if (u.ec->params.drift_compensation) {
2075         if (argc < 7) {
2076             pa_log("Drift compensation enabled but drift file not specified");
2077             goto fail;
2078         }
2079
2080         u.drift_file = fopen(argv[6], "r");
2081
2082         if (u.drift_file == NULL) {
2083             perror ("fopen failed");
2084             goto fail;
2085         }
2086     }
2087
2088     rdata = pa_xmalloc(u.blocksize);
2089     pdata = pa_xmalloc(u.blocksize);
2090     cdata = pa_xmalloc(u.blocksize);
2091
2092     if (!u.ec->params.drift_compensation) {
2093         while (fread(rdata, u.blocksize, 1, u.captured_file) > 0) {
2094             if (fread(pdata, u.blocksize, 1, u.played_file) == 0) {
2095                 perror("Played file ended before captured file");
2096                 goto fail;
2097             }
2098
2099             u.ec->run(u.ec, rdata, pdata, cdata);
2100
2101             unused = fwrite(cdata, u.blocksize, 1, u.canceled_file);
2102         }
2103     } else {
2104         while (fscanf(u.drift_file, "%c", &c) > 0) {
2105             switch (c) {
2106                 case 'd':
2107                     if (!fscanf(u.drift_file, "%a", &drift)) {
2108                         perror("Drift file incomplete");
2109                         goto fail;
2110                     }
2111
2112                     u.ec->set_drift(u.ec, drift);
2113
2114                     break;
2115
2116                 case 'c':
2117                     if (!fscanf(u.drift_file, "%d", &i)) {
2118                         perror("Drift file incomplete");
2119                         goto fail;
2120                     }
2121
2122                     if (fread(rdata, i, 1, u.captured_file) <= 0) {
2123                         perror("Captured file ended prematurely");
2124                         goto fail;
2125                     }
2126
2127                     u.ec->record(u.ec, rdata, cdata);
2128
2129                     unused = fwrite(cdata, i, 1, u.canceled_file);
2130
2131                     break;
2132
2133                 case 'p':
2134                     if (!fscanf(u.drift_file, "%d", &i)) {
2135                         perror("Drift file incomplete");
2136                         goto fail;
2137                     }
2138
2139                     if (fread(pdata, i, 1, u.played_file) <= 0) {
2140                         perror("Played file ended prematurely");
2141                         goto fail;
2142                     }
2143
2144                     u.ec->play(u.ec, pdata);
2145
2146                     break;
2147             }
2148         }
2149
2150         if (fread(rdata, i, 1, u.captured_file) > 0)
2151             pa_log("All capture data was not consumed");
2152         if (fread(pdata, i, 1, u.played_file) > 0)
2153             pa_log("All playback data was not consumed");
2154     }
2155
2156     u.ec->done(u.ec);
2157
2158     fclose(u.captured_file);
2159     fclose(u.played_file);
2160     fclose(u.canceled_file);
2161     if (u.drift_file)
2162         fclose(u.drift_file);
2163
2164 out:
2165     pa_xfree(rdata);
2166     pa_xfree(pdata);
2167     pa_xfree(cdata);
2168
2169     pa_xfree(u.ec);
2170     pa_xfree(u.core);
2171
2172     if (ma)
2173         pa_modargs_free(ma);
2174
2175     return ret;
2176
2177 usage:
2178     pa_log("Usage: %s play_file rec_file out_file [module args] [aec_args] [drift_file]", argv[0]);
2179
2180 fail:
2181     ret = -1;
2182     goto out;
2183 }
2184 #endif /* ECHO_CANCEL_TEST */