Fix indent
[platform/upstream/pulseaudio.git] / src / modules / echo-cancel / module-echo-cancel.c
1 /***
2     This file is part of PulseAudio.
3
4     Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6     Based on module-virtual-sink.c
7              module-virtual-source.c
8              module-loopback.c
9
10         Copyright 2010 Intel Corporation
11         Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13     PulseAudio is free software; you can redistribute it and/or modify
14     it under the terms of the GNU Lesser General Public License as published
15     by the Free Software Foundation; either version 2.1 of the License,
16     or (at your option) any later version.
17
18     PulseAudio is distributed in the hope that it will be useful, but
19     WITHOUT ANY WARRANTY; without even the implied warranty of
20     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21     General Public License for more details.
22
23     You should have received a copy of the GNU Lesser General Public License
24     along with PulseAudio; if not, write to the Free Software
25     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26     USA.
27 ***/
28
29 #ifdef HAVE_CONFIG_H
30 #include <config.h>
31 #endif
32
33 #include <stdio.h>
34 #include <math.h>
35
36 #include "echo-cancel.h"
37
38 #include <pulse/xmalloc.h>
39 #include <pulse/timeval.h>
40 #include <pulse/rtclock.h>
41
42 #include <pulsecore/i18n.h>
43 #include <pulsecore/atomic.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/namereg.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-rtclock.h>
49 #include <pulsecore/core-util.h>
50 #include <pulsecore/modargs.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/rtpoll.h>
53 #include <pulsecore/sample-util.h>
54 #include <pulsecore/ltdl-helper.h>
55
56 #include <pulsecore/protocol-native.h>
57 #include <pulsecore/pstream-util.h>
58
59 #include "module-echo-cancel-symdef.h"
60
61 PA_MODULE_AUTHOR("Wim Taymans");
62 PA_MODULE_DESCRIPTION("Echo Cancellation");
63 PA_MODULE_VERSION(PACKAGE_VERSION);
64 PA_MODULE_LOAD_ONCE(false);
65 PA_MODULE_USAGE(
66         _("source_name=<name for the source> "
67           "source_properties=<properties for the source> "
68           "source_master=<name of source to filter> "
69           "sink_name=<name for the sink> "
70           "sink_properties=<properties for the sink> "
71           "sink_master=<name of sink to filter> "
72           "adjust_time=<how often to readjust rates in s> "
73           "adjust_threshold=<how much drift to readjust after in ms> "
74           "format=<sample format> "
75           "rate=<sample rate> "
76           "channels=<number of channels> "
77           "channel_map=<channel map> "
78           "aec_method=<implementation to use> "
79           "aec_args=<parameters for the AEC engine> "
80           "save_aec=<save AEC data in /tmp> "
81           "autoloaded=<set if this module is being loaded automatically> "
82           "use_volume_sharing=<yes or no> "
83         ));
84
85 /* NOTE: Make sure the enum and ec_table are maintained in the correct order */
86 typedef enum {
87     PA_ECHO_CANCELLER_INVALID = -1,
88     PA_ECHO_CANCELLER_NULL,
89 #ifdef HAVE_SPEEX
90     PA_ECHO_CANCELLER_SPEEX,
91 #endif
92 #ifdef HAVE_ADRIAN_EC
93     PA_ECHO_CANCELLER_ADRIAN,
94 #endif
95 #ifdef HAVE_WEBRTC
96     PA_ECHO_CANCELLER_WEBRTC,
97 #endif
98 } pa_echo_canceller_method_t;
99
100 enum {
101     AEC_SET_VOLUME,
102     AEC_SET_DEVICE,
103 };
104
105 #ifdef HAVE_WEBRTC
106 #define DEFAULT_ECHO_CANCELLER "webrtc"
107 #else
108 #define DEFAULT_ECHO_CANCELLER "speex"
109 #endif
110
111 static const pa_echo_canceller ec_table[] = {
112     {
113         /* Null, Dummy echo canceller (just copies data) */
114         .init                   = pa_null_ec_init,
115         .run                    = pa_null_ec_run,
116         .done                   = pa_null_ec_done,
117     },
118 #ifdef HAVE_SPEEX
119     {
120         /* Speex */
121         .init                   = pa_speex_ec_init,
122         .run                    = pa_speex_ec_run,
123         .done                   = pa_speex_ec_done,
124     },
125 #endif
126 #ifdef HAVE_ADRIAN_EC
127     {
128         /* Adrian Andre's NLMS implementation */
129         .init                   = pa_adrian_ec_init,
130         .run                    = pa_adrian_ec_run,
131         .done                   = pa_adrian_ec_done,
132     },
133 #endif
134 #ifdef HAVE_WEBRTC
135     {
136         /* WebRTC's audio processing engine */
137         .init                   = pa_webrtc_ec_init,
138         .play                   = pa_webrtc_ec_play,
139         .record                 = pa_webrtc_ec_record,
140         .set_drift              = pa_webrtc_ec_set_drift,
141         .run                    = pa_webrtc_ec_run,
142         .done                   = pa_webrtc_ec_done,
143     },
144 #endif
145 };
146
147 #define DEFAULT_RATE 32000
148 #define DEFAULT_CHANNELS 1
149 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
150 #define DEFAULT_ADJUST_TOLERANCE (5*PA_USEC_PER_MSEC)
151 #define DEFAULT_SAVE_AEC false
152 #define DEFAULT_AUTOLOADED false
153
154 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
155
156 /* Can only be used in main context */
157 #define IS_ACTIVE(u) ((pa_source_get_state((u)->source) == PA_SOURCE_RUNNING) && \
158                       (pa_sink_get_state((u)->sink) == PA_SINK_RUNNING))
159
160 /* This module creates a new (virtual) source and sink.
161  *
162  * The data sent to the new sink is kept in a memblockq before being
163  * forwarded to the real sink_master.
164  *
165  * Data read from source_master is matched against the saved sink data and
166  * echo canceled data is then pushed onto the new source.
167  *
168  * Both source and sink masters have their own threads to push/pull data
169  * respectively. We however perform all our actions in the source IO thread.
170  * To do this we send all played samples to the source IO thread where they
171  * are then pushed into the memblockq.
172  *
173  * Alignment is performed in two steps:
174  *
175  * 1) when something happens that requires quick adjustment of the alignment of
176  *    capture and playback samples, we perform a resync. This adjusts the
177  *    position in the playback memblock to the requested sample. Quick
178  *    adjustments include moving the playback samples before the capture
179  *    samples (because else the echo canceler does not work) or when the
180  *    playback pointer drifts too far away.
181  *
182  * 2) periodically check the difference between capture and playback. We use a
183  *    low and high watermark for adjusting the alignment. Playback should always
184  *    be before capture and the difference should not be bigger than one frame
185  *    size. We would ideally like to resample the sink_input but most driver
186  *    don't give enough accuracy to be able to do that right now.
187  */
188
189 struct userdata;
190
191 struct pa_echo_canceller_msg {
192     pa_msgobject parent;
193     struct userdata *userdata;
194 };
195
196 PA_DEFINE_PRIVATE_CLASS(pa_echo_canceller_msg, pa_msgobject);
197 #define PA_ECHO_CANCELLER_MSG(o) (pa_echo_canceller_msg_cast(o))
198
199 struct snapshot {
200     pa_usec_t sink_now;
201     pa_usec_t sink_latency;
202     size_t sink_delay;
203     int64_t send_counter;
204
205     pa_usec_t source_now;
206     pa_usec_t source_latency;
207     size_t source_delay;
208     int64_t recv_counter;
209     size_t rlen;
210     size_t plen;
211 };
212
213 struct userdata {
214     pa_core *core;
215     pa_module *module;
216
217     bool autoloaded;
218     bool dead;
219     bool save_aec;
220
221     pa_echo_canceller *ec;
222     uint32_t source_output_blocksize;
223     uint32_t source_blocksize;
224     uint32_t sink_blocksize;
225
226     bool need_realign;
227
228     /* to wakeup the source I/O thread */
229     pa_asyncmsgq *asyncmsgq;
230     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
231
232     pa_source *source;
233     bool source_auto_desc;
234     pa_source_output *source_output;
235     pa_memblockq *source_memblockq; /* echo canceler needs fixed sized chunks */
236     size_t source_skip;
237
238     pa_sink *sink;
239     bool sink_auto_desc;
240     pa_sink_input *sink_input;
241     pa_memblockq *sink_memblockq;
242     int64_t send_counter;          /* updated in sink IO thread */
243     int64_t recv_counter;
244     size_t sink_skip;
245
246     /* Bytes left over from previous iteration */
247     size_t sink_rem;
248     size_t source_rem;
249
250     pa_atomic_t request_resync;
251
252     pa_time_event *time_event;
253     pa_usec_t adjust_time;
254     int adjust_threshold;
255
256     FILE *captured_file;
257     FILE *played_file;
258     FILE *canceled_file;
259     FILE *drift_file;
260
261     bool use_volume_sharing;
262
263     struct {
264         pa_cvolume current_volume;
265     } thread_info;
266
267     pa_native_protocol *protocol;
268 };
269
270 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
271
272 static const char* const valid_modargs[] = {
273     "source_name",
274     "source_properties",
275     "source_master",
276     "sink_name",
277     "sink_properties",
278     "sink_master",
279     "adjust_time",
280     "adjust_threshold",
281     "format",
282     "rate",
283     "channels",
284     "channel_map",
285     "aec_method",
286     "aec_args",
287     "save_aec",
288     "autoloaded",
289     "use_volume_sharing",
290     NULL
291 };
292
293 enum {
294     SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
295     SOURCE_OUTPUT_MESSAGE_REWIND,
296     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
297     SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
298 };
299
300 enum {
301     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
302 };
303
304 enum {
305     ECHO_CANCELLER_MESSAGE_SET_VOLUME,
306 };
307
308 static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
309     int64_t diff_time, buffer_latency;
310     pa_usec_t plen, rlen, source_delay, sink_delay, recv_counter, send_counter;
311
312     /* get latency difference between playback and record */
313     plen = pa_bytes_to_usec(snapshot->plen, &u->sink_input->sample_spec);
314     rlen = pa_bytes_to_usec(snapshot->rlen, &u->source_output->sample_spec);
315     if (plen > rlen)
316         buffer_latency = plen - rlen;
317     else
318         buffer_latency = 0;
319
320     source_delay = pa_bytes_to_usec(snapshot->source_delay, &u->source_output->sample_spec);
321     sink_delay = pa_bytes_to_usec(snapshot->sink_delay, &u->sink_input->sample_spec);
322     buffer_latency += source_delay + sink_delay;
323
324     /* add the latency difference due to samples not yet transferred */
325     send_counter = pa_bytes_to_usec(snapshot->send_counter, &u->sink->sample_spec);
326     recv_counter = pa_bytes_to_usec(snapshot->recv_counter, &u->sink->sample_spec);
327     if (recv_counter <= send_counter)
328         buffer_latency += (int64_t) (send_counter - recv_counter);
329     else
330         buffer_latency += PA_CLIP_SUB(buffer_latency, (int64_t) (recv_counter - send_counter));
331
332     /* capture and playback are perfectly aligned when diff_time is 0 */
333     diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
334           (snapshot->source_now - snapshot->source_latency);
335
336     pa_log_debug("Diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
337         (long long) snapshot->sink_latency,
338         (long long) buffer_latency, (long long) snapshot->source_latency,
339         (long long) source_delay, (long long) sink_delay,
340         (long long) (send_counter - recv_counter),
341         (long long) (snapshot->sink_now - snapshot->source_now));
342
343     return diff_time;
344 }
345
346 /* Called from main context */
347 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
348     struct userdata *u = userdata;
349     uint32_t old_rate, base_rate, new_rate;
350     int64_t diff_time;
351     /*size_t fs*/
352     struct snapshot latency_snapshot;
353
354     pa_assert(u);
355     pa_assert(a);
356     pa_assert(u->time_event == e);
357     pa_assert_ctl_context();
358
359     if (!IS_ACTIVE(u))
360         return;
361
362     /* update our snapshots */
363     pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
364     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
365
366     /* calculate drift between capture and playback */
367     diff_time = calc_diff(u, &latency_snapshot);
368
369     /*fs = pa_frame_size(&u->source_output->sample_spec);*/
370     old_rate = u->sink_input->sample_spec.rate;
371     base_rate = u->source_output->sample_spec.rate;
372
373     if (diff_time < 0) {
374         /* recording before playback, we need to adjust quickly. The echo
375          * canceler does not work in this case. */
376         pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
377             NULL, diff_time, NULL, NULL);
378         /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
379         new_rate = base_rate;
380     }
381     else {
382         if (diff_time > u->adjust_threshold) {
383             /* diff too big, quickly adjust */
384             pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
385                 NULL, diff_time, NULL, NULL);
386         }
387
388         /* recording behind playback, we need to slowly adjust the rate to match */
389         /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
390
391         /* assume equal samplerates for now */
392         new_rate = base_rate;
393     }
394
395     /* make sure we don't make too big adjustments because that sounds horrible */
396     if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
397         new_rate = base_rate;
398
399     if (new_rate != old_rate) {
400         pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
401
402         pa_sink_input_set_rate(u->sink_input, new_rate);
403     }
404
405     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
406 }
407
408 /* Called from source I/O thread context */
409 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
410     struct userdata *u = PA_SOURCE(o)->userdata;
411
412     switch (code) {
413
414         case PA_SOURCE_MESSAGE_GET_LATENCY:
415
416             /* The source is _put() before the source output is, so let's
417              * make sure we don't access it in that time. Also, the
418              * source output is first shut down, the source second. */
419             if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
420                 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
421                 *((pa_usec_t*) data) = 0;
422                 return 0;
423             }
424
425             *((pa_usec_t*) data) =
426
427                 /* Get the latency of the master source */
428                 pa_source_get_latency_within_thread(u->source_output->source) +
429                 /* Add the latency internal to our source output on top */
430                 pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
431                 /* and the buffering we do on the source */
432                 pa_bytes_to_usec(u->source_output_blocksize, &u->source_output->source->sample_spec);
433
434             return 0;
435
436         case PA_SOURCE_MESSAGE_SET_VOLUME_SYNCED:
437             u->thread_info.current_volume = u->source->reference_volume;
438             break;
439     }
440
441     return pa_source_process_msg(o, code, data, offset, chunk);
442 }
443
444 /* Called from sink I/O thread context */
445 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
446     struct userdata *u = PA_SINK(o)->userdata;
447
448     switch (code) {
449
450         case PA_SINK_MESSAGE_GET_LATENCY:
451
452             /* The sink is _put() before the sink input is, so let's
453              * make sure we don't access it in that time. Also, the
454              * sink input is first shut down, the sink second. */
455             if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
456                 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
457                 *((pa_usec_t*) data) = 0;
458                 return 0;
459             }
460
461             *((pa_usec_t*) data) =
462
463                 /* Get the latency of the master sink */
464                 pa_sink_get_latency_within_thread(u->sink_input->sink) +
465
466                 /* Add the latency internal to our sink input on top */
467                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
468
469             return 0;
470     }
471
472     return pa_sink_process_msg(o, code, data, offset, chunk);
473 }
474
475 /* Called from main context */
476 static int source_set_state_cb(pa_source *s, pa_source_state_t state) {
477     struct userdata *u;
478
479     pa_source_assert_ref(s);
480     pa_assert_se(u = s->userdata);
481
482     if (!PA_SOURCE_IS_LINKED(state) ||
483         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
484         return 0;
485
486     if (state == PA_SOURCE_RUNNING) {
487         /* restart timer when both sink and source are active */
488         if (IS_ACTIVE(u) && u->adjust_time)
489             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
490
491         pa_atomic_store(&u->request_resync, 1);
492         pa_source_output_cork(u->source_output, false);
493     } else if (state == PA_SOURCE_SUSPENDED) {
494         pa_source_output_cork(u->source_output, true);
495     }
496
497     return 0;
498 }
499
500 /* Called from main context */
501 static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state) {
502     struct userdata *u;
503
504     pa_sink_assert_ref(s);
505     pa_assert_se(u = s->userdata);
506
507     if (!PA_SINK_IS_LINKED(state) ||
508         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
509         return 0;
510
511     if (state == PA_SINK_RUNNING) {
512         /* restart timer when both sink and source are active */
513         if (IS_ACTIVE(u) && u->adjust_time)
514             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
515
516         pa_atomic_store(&u->request_resync, 1);
517         pa_sink_input_cork(u->sink_input, false);
518     } else if (state == PA_SINK_SUSPENDED) {
519         pa_sink_input_cork(u->sink_input, true);
520     }
521
522     return 0;
523 }
524
525 /* Called from source I/O thread context */
526 static void source_update_requested_latency_cb(pa_source *s) {
527     struct userdata *u;
528
529     pa_source_assert_ref(s);
530     pa_assert_se(u = s->userdata);
531
532     if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
533         !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
534         return;
535
536     pa_log_debug("Source update requested latency");
537
538     /* Just hand this one over to the master source */
539     pa_source_output_set_requested_latency_within_thread(
540             u->source_output,
541             pa_source_get_requested_latency_within_thread(s));
542 }
543
544 /* Called from sink I/O thread context */
545 static void sink_update_requested_latency_cb(pa_sink *s) {
546     struct userdata *u;
547
548     pa_sink_assert_ref(s);
549     pa_assert_se(u = s->userdata);
550
551     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
552         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
553         return;
554
555     pa_log_debug("Sink update requested latency");
556
557     /* Just hand this one over to the master sink */
558     pa_sink_input_set_requested_latency_within_thread(
559             u->sink_input,
560             pa_sink_get_requested_latency_within_thread(s));
561 }
562
563 /* Called from sink I/O thread context */
564 static void sink_request_rewind_cb(pa_sink *s) {
565     struct userdata *u;
566
567     pa_sink_assert_ref(s);
568     pa_assert_se(u = s->userdata);
569
570     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
571         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
572         return;
573
574     pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
575
576     /* Just hand this one over to the master sink */
577     pa_sink_input_request_rewind(u->sink_input,
578                                  s->thread_info.rewind_nbytes, true, false, false);
579 }
580
581 /* Called from main context */
582 static void source_set_volume_cb(pa_source *s) {
583     struct userdata *u;
584
585     pa_source_assert_ref(s);
586     pa_assert_se(u = s->userdata);
587
588     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
589         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
590         return;
591
592     pa_source_output_set_volume(u->source_output, &s->real_volume, s->save_volume, true);
593 }
594
595 /* Called from main context */
596 static void sink_set_volume_cb(pa_sink *s) {
597     struct userdata *u;
598
599     pa_sink_assert_ref(s);
600     pa_assert_se(u = s->userdata);
601
602     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
603         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
604         return;
605
606     pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, true);
607 }
608
609 /* Called from main context. */
610 static void source_get_volume_cb(pa_source *s) {
611     struct userdata *u;
612     pa_cvolume v;
613
614     pa_source_assert_ref(s);
615     pa_assert_se(u = s->userdata);
616
617     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
618         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
619         return;
620
621     pa_source_output_get_volume(u->source_output, &v, true);
622
623     if (pa_cvolume_equal(&s->real_volume, &v))
624         /* no change */
625         return;
626
627     s->real_volume = v;
628     pa_source_set_soft_volume(s, NULL);
629 }
630
631 /* Called from main context */
632 static void source_set_mute_cb(pa_source *s) {
633     struct userdata *u;
634
635     pa_source_assert_ref(s);
636     pa_assert_se(u = s->userdata);
637
638     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
639         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
640         return;
641
642     pa_source_output_set_mute(u->source_output, s->muted, s->save_muted);
643 }
644
645 /* Called from main context */
646 static void sink_set_mute_cb(pa_sink *s) {
647     struct userdata *u;
648
649     pa_sink_assert_ref(s);
650     pa_assert_se(u = s->userdata);
651
652     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
653         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
654         return;
655
656     pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
657 }
658
659 /* Called from source I/O thread context. */
660 static void apply_diff_time(struct userdata *u, int64_t diff_time) {
661     int64_t diff;
662
663     if (diff_time < 0) {
664         diff = pa_usec_to_bytes(-diff_time, &u->sink_input->sample_spec);
665
666         if (diff > 0) {
667             /* add some extra safety samples to compensate for jitter in the
668              * timings */
669             diff += 10 * pa_frame_size (&u->sink_input->sample_spec);
670
671             pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
672
673             u->sink_skip = diff;
674             u->source_skip = 0;
675         }
676     } else if (diff_time > 0) {
677         diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
678
679         if (diff > 0) {
680             pa_log("Playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
681
682             u->source_skip = diff;
683             u->sink_skip = 0;
684         }
685     }
686 }
687
688 /* Called from source I/O thread context. */
689 static void do_resync(struct userdata *u) {
690     int64_t diff_time;
691     struct snapshot latency_snapshot;
692
693     pa_log("Doing resync");
694
695     /* update our snapshot */
696     source_output_snapshot_within_thread(u, &latency_snapshot);
697     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
698
699     /* calculate drift between capture and playback */
700     diff_time = calc_diff(u, &latency_snapshot);
701
702     /* and adjust for the drift */
703     apply_diff_time(u, diff_time);
704 }
705
706 /* 1. Calculate drift at this point, pass to canceller
707  * 2. Push out playback samples in blocksize chunks
708  * 3. Push out capture samples in blocksize chunks
709  * 4. ???
710  * 5. Profit
711  *
712  * Called from source I/O thread context.
713  */
714 static void do_push_drift_comp(struct userdata *u) {
715     size_t rlen, plen;
716     pa_memchunk rchunk, pchunk, cchunk;
717     uint8_t *rdata, *pdata, *cdata;
718     float drift;
719     int unused PA_GCC_UNUSED;
720
721     rlen = pa_memblockq_get_length(u->source_memblockq);
722     plen = pa_memblockq_get_length(u->sink_memblockq);
723
724     /* Estimate snapshot drift as follows:
725      *   pd: amount of data consumed since last time
726      *   rd: amount of data consumed since last time
727      *
728      *   drift = (pd - rd) / rd;
729      *
730      * We calculate pd and rd as the memblockq length less the number of
731      * samples left from the last iteration (to avoid double counting
732      * those remainder samples.
733      */
734     drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
735     u->sink_rem = plen % u->sink_blocksize;
736     u->source_rem = rlen % u->source_output_blocksize;
737
738     /* Now let the canceller work its drift compensation magic */
739     u->ec->set_drift(u->ec, drift);
740
741     if (u->save_aec) {
742         if (u->drift_file)
743             fprintf(u->drift_file, "d %a\n", drift);
744     }
745
746     /* Send in the playback samples first */
747     while (plen >= u->sink_blocksize) {
748         pa_memblockq_peek_fixed_size(u->sink_memblockq, u->sink_blocksize, &pchunk);
749         pdata = pa_memblock_acquire(pchunk.memblock);
750         pdata += pchunk.index;
751
752         u->ec->play(u->ec, pdata);
753
754         if (u->save_aec) {
755             if (u->drift_file)
756                 fprintf(u->drift_file, "p %d\n", u->sink_blocksize);
757             if (u->played_file)
758                 unused = fwrite(pdata, 1, u->sink_blocksize, u->played_file);
759         }
760
761         pa_memblock_release(pchunk.memblock);
762         pa_memblockq_drop(u->sink_memblockq, u->sink_blocksize);
763         pa_memblock_unref(pchunk.memblock);
764
765         plen -= u->sink_blocksize;
766     }
767
768     /* And now the capture samples */
769     while (rlen >= u->source_output_blocksize) {
770         pa_memblockq_peek_fixed_size(u->source_memblockq, u->source_output_blocksize, &rchunk);
771
772         rdata = pa_memblock_acquire(rchunk.memblock);
773         rdata += rchunk.index;
774
775         cchunk.index = 0;
776         cchunk.length = u->source_output_blocksize;
777         cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
778         cdata = pa_memblock_acquire(cchunk.memblock);
779
780         u->ec->record(u->ec, rdata, cdata);
781
782         if (u->save_aec) {
783             if (u->drift_file)
784                 fprintf(u->drift_file, "c %d\n", u->source_output_blocksize);
785             if (u->captured_file)
786                 unused = fwrite(rdata, 1, u->source_output_blocksize, u->captured_file);
787             if (u->canceled_file)
788                 unused = fwrite(cdata, 1, u->source_output_blocksize, u->canceled_file);
789         }
790
791         pa_memblock_release(cchunk.memblock);
792         pa_memblock_release(rchunk.memblock);
793
794         pa_memblock_unref(rchunk.memblock);
795
796         pa_source_post(u->source, &cchunk);
797         pa_memblock_unref(cchunk.memblock);
798
799         pa_memblockq_drop(u->source_memblockq, u->source_output_blocksize);
800         rlen -= u->source_output_blocksize;
801     }
802 }
803
804 /* This one's simpler than the drift compensation case -- we just iterate over
805  * the capture buffer, and pass the canceller blocksize bytes of playback and
806  * capture data.
807  *
808  * Called from source I/O thread context. */
809 static void do_push(struct userdata *u) {
810     size_t rlen, plen;
811     pa_memchunk rchunk, pchunk, cchunk;
812     uint8_t *rdata, *pdata, *cdata;
813     int unused PA_GCC_UNUSED;
814
815     rlen = pa_memblockq_get_length(u->source_memblockq);
816     plen = pa_memblockq_get_length(u->sink_memblockq);
817
818     while (rlen >= u->source_output_blocksize) {
819
820         /* take fixed blocks from recorded and played samples */
821         pa_memblockq_peek_fixed_size(u->source_memblockq, u->source_output_blocksize, &rchunk);
822         pa_memblockq_peek_fixed_size(u->sink_memblockq, u->sink_blocksize, &pchunk);
823
824         /* we ran out of played data and pchunk has been filled with silence bytes */
825         if (plen < u->sink_blocksize)
826             pa_memblockq_seek(u->sink_memblockq, u->sink_blocksize - plen, PA_SEEK_RELATIVE, true);
827
828         rdata = pa_memblock_acquire(rchunk.memblock);
829         rdata += rchunk.index;
830         pdata = pa_memblock_acquire(pchunk.memblock);
831         pdata += pchunk.index;
832
833         cchunk.index = 0;
834         cchunk.length = u->source_blocksize;
835         cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
836         cdata = pa_memblock_acquire(cchunk.memblock);
837
838         if (u->save_aec) {
839             if (u->captured_file)
840                 unused = fwrite(rdata, 1, u->source_output_blocksize, u->captured_file);
841             if (u->played_file)
842                 unused = fwrite(pdata, 1, u->sink_blocksize, u->played_file);
843         }
844
845         /* perform echo cancellation */
846         u->ec->run(u->ec, rdata, pdata, cdata);
847
848         if (u->save_aec) {
849             if (u->canceled_file)
850                 unused = fwrite(cdata, 1, u->source_blocksize, u->canceled_file);
851         }
852
853         pa_memblock_release(cchunk.memblock);
854         pa_memblock_release(pchunk.memblock);
855         pa_memblock_release(rchunk.memblock);
856
857         /* drop consumed source samples */
858         pa_memblockq_drop(u->source_memblockq, u->source_output_blocksize);
859         pa_memblock_unref(rchunk.memblock);
860         rlen -= u->source_output_blocksize;
861
862         /* drop consumed sink samples */
863         pa_memblockq_drop(u->sink_memblockq, u->sink_blocksize);
864         pa_memblock_unref(pchunk.memblock);
865
866         if (plen >= u->sink_blocksize)
867             plen -= u->sink_blocksize;
868         else
869             plen = 0;
870
871         /* forward the (echo-canceled) data to the virtual source */
872         pa_source_post(u->source, &cchunk);
873         pa_memblock_unref(cchunk.memblock);
874     }
875 }
876
877 /* Called from source I/O thread context. */
878 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
879     struct userdata *u;
880     size_t rlen, plen, to_skip;
881     pa_memchunk rchunk;
882
883     pa_source_output_assert_ref(o);
884     pa_source_output_assert_io_context(o);
885     pa_assert_se(u = o->userdata);
886
887     if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
888         pa_log("Push when no link?");
889         return;
890     }
891
892     if (PA_UNLIKELY(u->source->thread_info.state != PA_SOURCE_RUNNING ||
893                     u->sink->thread_info.state != PA_SINK_RUNNING)) {
894         pa_source_post(u->source, chunk);
895         return;
896     }
897
898     /* handle queued messages, do any message sending of our own */
899     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
900         ;
901
902     pa_memblockq_push_align(u->source_memblockq, chunk);
903
904     rlen = pa_memblockq_get_length(u->source_memblockq);
905     plen = pa_memblockq_get_length(u->sink_memblockq);
906
907     /* Let's not do anything else till we have enough data to process */
908     if (rlen < u->source_output_blocksize)
909         return;
910
911     /* See if we need to drop samples in order to sync */
912     if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
913         do_resync(u);
914     }
915
916     /* Okay, skip cancellation for skipped source samples if needed. */
917     if (PA_UNLIKELY(u->source_skip)) {
918         /* The slightly tricky bit here is that we drop all but modulo
919          * blocksize bytes and then adjust for that last bit on the sink side.
920          * We do this because the source data is coming at a fixed rate, which
921          * means the only way to try to catch up is drop sink samples and let
922          * the canceller cope up with this. */
923         to_skip = rlen >= u->source_skip ? u->source_skip : rlen;
924         to_skip -= to_skip % u->source_output_blocksize;
925
926         if (to_skip) {
927             pa_memblockq_peek_fixed_size(u->source_memblockq, to_skip, &rchunk);
928             pa_source_post(u->source, &rchunk);
929
930             pa_memblock_unref(rchunk.memblock);
931             pa_memblockq_drop(u->source_memblockq, to_skip);
932
933             rlen -= to_skip;
934             u->source_skip -= to_skip;
935         }
936
937         if (rlen && u->source_skip % u->source_output_blocksize) {
938             u->sink_skip += (uint64_t) (u->source_output_blocksize - (u->source_skip % u->source_output_blocksize)) * u->sink_blocksize / u->source_output_blocksize;
939             u->source_skip -= (u->source_skip % u->source_output_blocksize);
940         }
941     }
942
943     /* And for the sink, these samples have been played back already, so we can
944      * just drop them and get on with it. */
945     if (PA_UNLIKELY(u->sink_skip)) {
946         to_skip = plen >= u->sink_skip ? u->sink_skip : plen;
947
948         pa_memblockq_drop(u->sink_memblockq, to_skip);
949
950         plen -= to_skip;
951         u->sink_skip -= to_skip;
952     }
953
954     /* process and push out samples */
955     if (u->ec->params.drift_compensation)
956         do_push_drift_comp(u);
957     else
958         do_push(u);
959 }
960
961 /* Called from sink I/O thread context. */
962 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
963     struct userdata *u;
964
965     pa_sink_input_assert_ref(i);
966     pa_assert(chunk);
967     pa_assert_se(u = i->userdata);
968
969     if (u->sink->thread_info.rewind_requested)
970         pa_sink_process_rewind(u->sink, 0);
971
972     pa_sink_render_full(u->sink, nbytes, chunk);
973
974     if (i->thread_info.underrun_for > 0) {
975         pa_log_debug("Handling end of underrun.");
976         pa_atomic_store(&u->request_resync, 1);
977     }
978
979     /* let source thread handle the chunk. pass the sample count as well so that
980      * the source IO thread can update the right variables. */
981     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
982         NULL, 0, chunk, NULL);
983     u->send_counter += chunk->length;
984
985     return 0;
986 }
987
988 /* Called from source I/O thread context. */
989 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
990     struct userdata *u;
991
992     pa_source_output_assert_ref(o);
993     pa_source_output_assert_io_context(o);
994     pa_assert_se(u = o->userdata);
995
996     pa_source_process_rewind(u->source, nbytes);
997
998     /* go back on read side, we need to use older sink data for this */
999     pa_memblockq_rewind(u->sink_memblockq, nbytes);
1000
1001     /* manipulate write index */
1002     pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, true);
1003
1004     pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
1005         (long long) pa_memblockq_get_length (u->source_memblockq));
1006 }
1007
1008 /* Called from sink I/O thread context. */
1009 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1010     struct userdata *u;
1011
1012     pa_sink_input_assert_ref(i);
1013     pa_assert_se(u = i->userdata);
1014
1015     pa_log_debug("Sink process rewind %lld", (long long) nbytes);
1016
1017     pa_sink_process_rewind(u->sink, nbytes);
1018
1019     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
1020     u->send_counter -= nbytes;
1021 }
1022
1023 /* Called from source I/O thread context. */
1024 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
1025     size_t delay, rlen, plen;
1026     pa_usec_t now, latency;
1027
1028     now = pa_rtclock_now();
1029     latency = pa_source_get_latency_within_thread(u->source_output->source);
1030     delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
1031
1032     delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
1033     rlen = pa_memblockq_get_length(u->source_memblockq);
1034     plen = pa_memblockq_get_length(u->sink_memblockq);
1035
1036     snapshot->source_now = now;
1037     snapshot->source_latency = latency;
1038     snapshot->source_delay = delay;
1039     snapshot->recv_counter = u->recv_counter;
1040     snapshot->rlen = rlen + u->sink_skip;
1041     snapshot->plen = plen + u->source_skip;
1042 }
1043
1044 /* Called from source I/O thread context. */
1045 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1046     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
1047
1048     switch (code) {
1049
1050         case SOURCE_OUTPUT_MESSAGE_POST:
1051
1052             pa_source_output_assert_io_context(u->source_output);
1053
1054             if (u->source_output->source->thread_info.state == PA_SOURCE_RUNNING)
1055                 pa_memblockq_push_align(u->sink_memblockq, chunk);
1056             else
1057                 pa_memblockq_flush_write(u->sink_memblockq, true);
1058
1059             u->recv_counter += (int64_t) chunk->length;
1060
1061             return 0;
1062
1063         case SOURCE_OUTPUT_MESSAGE_REWIND:
1064             pa_source_output_assert_io_context(u->source_output);
1065
1066             /* manipulate write index, never go past what we have */
1067             if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
1068                 pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, true);
1069             else
1070                 pa_memblockq_flush_write(u->sink_memblockq, true);
1071
1072             pa_log_debug("Sink rewind (%lld)", (long long) offset);
1073
1074             u->recv_counter -= offset;
1075
1076             return 0;
1077
1078         case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
1079             struct snapshot *snapshot = (struct snapshot *) data;
1080
1081             source_output_snapshot_within_thread(u, snapshot);
1082             return 0;
1083         }
1084
1085         case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
1086             apply_diff_time(u, offset);
1087             return 0;
1088
1089     }
1090
1091     return pa_source_output_process_msg(obj, code, data, offset, chunk);
1092 }
1093
1094 /* Called from sink I/O thread context. */
1095 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1096     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1097
1098     switch (code) {
1099
1100         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1101             size_t delay;
1102             pa_usec_t now, latency;
1103             struct snapshot *snapshot = (struct snapshot *) data;
1104
1105             pa_sink_input_assert_io_context(u->sink_input);
1106
1107             now = pa_rtclock_now();
1108             latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
1109             delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1110
1111             delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
1112
1113             snapshot->sink_now = now;
1114             snapshot->sink_latency = latency;
1115             snapshot->sink_delay = delay;
1116             snapshot->send_counter = u->send_counter;
1117             return 0;
1118         }
1119     }
1120
1121     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1122 }
1123
1124 /* Called from sink I/O thread context. */
1125 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1126     struct userdata *u;
1127
1128     pa_sink_input_assert_ref(i);
1129     pa_assert_se(u = i->userdata);
1130
1131     pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
1132
1133     /* FIXME: Too small max_rewind:
1134      * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
1135     pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
1136     pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
1137 }
1138
1139 /* Called from source I/O thread context. */
1140 static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
1141     struct userdata *u;
1142
1143     pa_source_output_assert_ref(o);
1144     pa_assert_se(u = o->userdata);
1145
1146     pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
1147
1148     pa_source_set_max_rewind_within_thread(u->source, nbytes);
1149 }
1150
1151 /* Called from sink I/O thread context. */
1152 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1153     struct userdata *u;
1154
1155     pa_sink_input_assert_ref(i);
1156     pa_assert_se(u = i->userdata);
1157
1158     pa_log_debug("Sink input update max request %lld", (long long) nbytes);
1159
1160     pa_sink_set_max_request_within_thread(u->sink, nbytes);
1161 }
1162
1163 /* Called from sink I/O thread context. */
1164 static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
1165     struct userdata *u;
1166     pa_usec_t latency;
1167
1168     pa_sink_input_assert_ref(i);
1169     pa_assert_se(u = i->userdata);
1170
1171     latency = pa_sink_get_requested_latency_within_thread(i->sink);
1172
1173     pa_log_debug("Sink input update requested latency %lld", (long long) latency);
1174 }
1175
1176 /* Called from source I/O thread context. */
1177 static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
1178     struct userdata *u;
1179     pa_usec_t latency;
1180
1181     pa_source_output_assert_ref(o);
1182     pa_assert_se(u = o->userdata);
1183
1184     latency = pa_source_get_requested_latency_within_thread(o->source);
1185
1186     pa_log_debug("Source output update requested latency %lld", (long long) latency);
1187 }
1188
1189 /* Called from sink I/O thread context. */
1190 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
1191     struct userdata *u;
1192
1193     pa_sink_input_assert_ref(i);
1194     pa_assert_se(u = i->userdata);
1195
1196     pa_log_debug("Sink input update latency range %lld %lld",
1197         (long long) i->sink->thread_info.min_latency,
1198         (long long) i->sink->thread_info.max_latency);
1199
1200     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1201 }
1202
1203 /* Called from source I/O thread context. */
1204 static void source_output_update_source_latency_range_cb(pa_source_output *o) {
1205     struct userdata *u;
1206
1207     pa_source_output_assert_ref(o);
1208     pa_assert_se(u = o->userdata);
1209
1210     pa_log_debug("Source output update latency range %lld %lld",
1211         (long long) o->source->thread_info.min_latency,
1212         (long long) o->source->thread_info.max_latency);
1213
1214     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1215 }
1216
1217 /* Called from sink I/O thread context. */
1218 static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1219     struct userdata *u;
1220
1221     pa_sink_input_assert_ref(i);
1222     pa_assert_se(u = i->userdata);
1223
1224     pa_log_debug("Sink input update fixed latency %lld",
1225         (long long) i->sink->thread_info.fixed_latency);
1226
1227     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1228 }
1229
1230 /* Called from source I/O thread context. */
1231 static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1232     struct userdata *u;
1233
1234     pa_source_output_assert_ref(o);
1235     pa_assert_se(u = o->userdata);
1236
1237     pa_log_debug("Source output update fixed latency %lld",
1238         (long long) o->source->thread_info.fixed_latency);
1239
1240     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1241 }
1242
1243 /* Called from source I/O thread context. */
1244 static void source_output_attach_cb(pa_source_output *o) {
1245     struct userdata *u;
1246
1247     pa_source_output_assert_ref(o);
1248     pa_source_output_assert_io_context(o);
1249     pa_assert_se(u = o->userdata);
1250
1251     pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1252     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1253     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1254     pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1255
1256     pa_log_debug("Source output %d attach", o->index);
1257
1258     pa_source_attach_within_thread(u->source);
1259
1260     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1261             o->source->thread_info.rtpoll,
1262             PA_RTPOLL_LATE,
1263             u->asyncmsgq);
1264 }
1265
1266 /* Called from sink I/O thread context. */
1267 static void sink_input_attach_cb(pa_sink_input *i) {
1268     struct userdata *u;
1269
1270     pa_sink_input_assert_ref(i);
1271     pa_assert_se(u = i->userdata);
1272
1273     pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1274     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1275
1276     /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1277      * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1278     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1279
1280     /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1281      * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1282      * HERE. SEE (6) */
1283     pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1284
1285     /* FIXME: Too small max_rewind:
1286      * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
1287     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1288
1289     pa_log_debug("Sink input %d attach", i->index);
1290
1291     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1292             i->sink->thread_info.rtpoll,
1293             PA_RTPOLL_LATE,
1294             u->asyncmsgq);
1295
1296     pa_sink_attach_within_thread(u->sink);
1297 }
1298
1299 /* Called from source I/O thread context. */
1300 static void source_output_detach_cb(pa_source_output *o) {
1301     struct userdata *u;
1302
1303     pa_source_output_assert_ref(o);
1304     pa_source_output_assert_io_context(o);
1305     pa_assert_se(u = o->userdata);
1306
1307     pa_source_detach_within_thread(u->source);
1308     pa_source_set_rtpoll(u->source, NULL);
1309
1310     pa_log_debug("Source output %d detach", o->index);
1311
1312     if (u->rtpoll_item_read) {
1313         pa_rtpoll_item_free(u->rtpoll_item_read);
1314         u->rtpoll_item_read = NULL;
1315     }
1316 }
1317
1318 /* Called from sink I/O thread context. */
1319 static void sink_input_detach_cb(pa_sink_input *i) {
1320     struct userdata *u;
1321
1322     pa_sink_input_assert_ref(i);
1323     pa_assert_se(u = i->userdata);
1324
1325     pa_sink_detach_within_thread(u->sink);
1326
1327     pa_sink_set_rtpoll(u->sink, NULL);
1328
1329     pa_log_debug("Sink input %d detach", i->index);
1330
1331     if (u->rtpoll_item_write) {
1332         pa_rtpoll_item_free(u->rtpoll_item_write);
1333         u->rtpoll_item_write = NULL;
1334     }
1335 }
1336
1337 /* Called from source I/O thread context. */
1338 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1339     struct userdata *u;
1340
1341     pa_source_output_assert_ref(o);
1342     pa_source_output_assert_io_context(o);
1343     pa_assert_se(u = o->userdata);
1344
1345     pa_log_debug("Source output %d state %d", o->index, state);
1346 }
1347
1348 /* Called from sink I/O thread context. */
1349 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1350     struct userdata *u;
1351
1352     pa_sink_input_assert_ref(i);
1353     pa_assert_se(u = i->userdata);
1354
1355     pa_log_debug("Sink input %d state %d", i->index, state);
1356
1357     /* If we are added for the first time, ask for a rewinding so that
1358      * we are heard right-away. */
1359     if (PA_SINK_INPUT_IS_LINKED(state) &&
1360         i->thread_info.state == PA_SINK_INPUT_INIT) {
1361         pa_log_debug("Requesting rewind due to state change.");
1362         pa_sink_input_request_rewind(i, 0, false, true, true);
1363     }
1364 }
1365
1366 /* Called from main context. */
1367 static void source_output_kill_cb(pa_source_output *o) {
1368     struct userdata *u;
1369
1370     pa_source_output_assert_ref(o);
1371     pa_assert_ctl_context();
1372     pa_assert_se(u = o->userdata);
1373
1374     u->dead = true;
1375
1376     /* The order here matters! We first kill the source output, followed
1377      * by the source. That means the source callbacks must be protected
1378      * against an unconnected source output! */
1379     pa_source_output_unlink(u->source_output);
1380     pa_source_unlink(u->source);
1381
1382     pa_source_output_unref(u->source_output);
1383     u->source_output = NULL;
1384
1385     pa_source_unref(u->source);
1386     u->source = NULL;
1387
1388     pa_log_debug("Source output kill %d", o->index);
1389
1390     pa_module_unload_request(u->module, true);
1391 }
1392
1393 /* Called from main context */
1394 static void sink_input_kill_cb(pa_sink_input *i) {
1395     struct userdata *u;
1396
1397     pa_sink_input_assert_ref(i);
1398     pa_assert_se(u = i->userdata);
1399
1400     u->dead = true;
1401
1402     /* The order here matters! We first kill the sink input, followed
1403      * by the sink. That means the sink callbacks must be protected
1404      * against an unconnected sink input! */
1405     pa_sink_input_unlink(u->sink_input);
1406     pa_sink_unlink(u->sink);
1407
1408     pa_sink_input_unref(u->sink_input);
1409     u->sink_input = NULL;
1410
1411     pa_sink_unref(u->sink);
1412     u->sink = NULL;
1413
1414     pa_log_debug("Sink input kill %d", i->index);
1415
1416     pa_module_unload_request(u->module, true);
1417 }
1418
1419 /* Called from main context. */
1420 static bool source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1421     struct userdata *u;
1422
1423     pa_source_output_assert_ref(o);
1424     pa_assert_ctl_context();
1425     pa_assert_se(u = o->userdata);
1426
1427     if (u->dead || u->autoloaded)
1428         return false;
1429
1430     return (u->source != dest) && (u->sink != dest->monitor_of);
1431 }
1432
1433 /* Called from main context */
1434 static bool sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1435     struct userdata *u;
1436
1437     pa_sink_input_assert_ref(i);
1438     pa_assert_se(u = i->userdata);
1439
1440     if (u->dead || u->autoloaded)
1441         return false;
1442
1443     return u->sink != dest;
1444 }
1445
1446 /* Called from main context. */
1447 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1448     struct userdata *u;
1449
1450     pa_source_output_assert_ref(o);
1451     pa_assert_ctl_context();
1452     pa_assert_se(u = o->userdata);
1453
1454     if (dest) {
1455         pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1456         pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1457     } else
1458         pa_source_set_asyncmsgq(u->source, NULL);
1459
1460     if (u->source_auto_desc && dest) {
1461         const char *y, *z;
1462         pa_proplist *pl;
1463
1464         pl = pa_proplist_new();
1465         y = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION);
1466         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1467         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1468                 y ? y : u->sink_input->sink->name);
1469
1470         pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1471         pa_proplist_free(pl);
1472     }
1473 }
1474
1475 /* Called from main context */
1476 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1477     struct userdata *u;
1478
1479     pa_sink_input_assert_ref(i);
1480     pa_assert_se(u = i->userdata);
1481
1482     if (dest) {
1483         pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1484         pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1485     } else
1486         pa_sink_set_asyncmsgq(u->sink, NULL);
1487
1488     if (u->sink_auto_desc && dest) {
1489         const char *y, *z;
1490         pa_proplist *pl;
1491
1492         pl = pa_proplist_new();
1493         y = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION);
1494         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1495         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1496                          y ? y : u->source_output->source->name);
1497
1498         pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1499         pa_proplist_free(pl);
1500     }
1501 }
1502
1503 /* Called from main context */
1504 static void sink_input_volume_changed_cb(pa_sink_input *i) {
1505     struct userdata *u;
1506
1507     pa_sink_input_assert_ref(i);
1508     pa_assert_se(u = i->userdata);
1509
1510     pa_sink_volume_changed(u->sink, &i->volume);
1511 }
1512
1513 /* Called from main context */
1514 static void sink_input_mute_changed_cb(pa_sink_input *i) {
1515     struct userdata *u;
1516
1517     pa_sink_input_assert_ref(i);
1518     pa_assert_se(u = i->userdata);
1519
1520     pa_sink_mute_changed(u->sink, i->muted);
1521 }
1522
1523 /* Called from main context */
1524 static int canceller_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1525     struct pa_echo_canceller_msg *msg;
1526     struct userdata *u;
1527
1528     pa_assert(o);
1529
1530     msg = PA_ECHO_CANCELLER_MSG(o);
1531     u = msg->userdata;
1532
1533     switch (code) {
1534         case ECHO_CANCELLER_MESSAGE_SET_VOLUME: {
1535             pa_cvolume *v = (pa_cvolume *) userdata;
1536
1537             if (u->use_volume_sharing)
1538                 pa_source_set_volume(u->source, v, true, false);
1539             else
1540                 pa_source_output_set_volume(u->source_output, v, false, true);
1541
1542             break;
1543         }
1544
1545         default:
1546             pa_assert_not_reached();
1547             break;
1548     }
1549
1550     return 0;
1551 }
1552
1553 /* Called by the canceller, so source I/O thread context. */
1554 void pa_echo_canceller_get_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1555     *v = ec->msg->userdata->thread_info.current_volume;
1556 }
1557
1558 /* Called by the canceller, so source I/O thread context. */
1559 void pa_echo_canceller_set_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1560     if (!pa_cvolume_equal(&ec->msg->userdata->thread_info.current_volume, v)) {
1561         pa_cvolume *vol = pa_xnewdup(pa_cvolume, v, 1);
1562
1563         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(ec->msg), ECHO_CANCELLER_MESSAGE_SET_VOLUME, vol, 0, NULL,
1564                 pa_xfree);
1565     }
1566 }
1567
1568 uint32_t pa_echo_canceller_blocksize_power2(unsigned rate, unsigned ms) {
1569     unsigned nframes = (rate * ms) / 1000;
1570     uint32_t y = 1 << ((8 * sizeof(uint32_t)) - 2);
1571
1572     assert(rate >= 4000);
1573     assert(ms >= 1);
1574
1575     /* nframes should be a power of 2, round down to nearest power of two */
1576     while (y > nframes)
1577         y >>= 1;
1578
1579     assert(y >= 1);
1580     return y;
1581 }
1582
1583 static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1584     if (pa_streq(method, "null"))
1585         return PA_ECHO_CANCELLER_NULL;
1586 #ifdef HAVE_SPEEX
1587     if (pa_streq(method, "speex"))
1588         return PA_ECHO_CANCELLER_SPEEX;
1589 #endif
1590 #ifdef HAVE_ADRIAN_EC
1591     if (pa_streq(method, "adrian"))
1592         return PA_ECHO_CANCELLER_ADRIAN;
1593 #endif
1594 #ifdef HAVE_WEBRTC
1595     if (pa_streq(method, "webrtc"))
1596         return PA_ECHO_CANCELLER_WEBRTC;
1597 #endif
1598     return PA_ECHO_CANCELLER_INVALID;
1599 }
1600
1601 static int extension_cb(pa_native_protocol *p, pa_module *m, pa_native_connection *c, uint32_t tag, pa_tagstruct *t) {
1602     uint32_t command;
1603     uint32_t value;
1604     pa_tagstruct *reply = NULL;
1605     pa_assert(p);
1606     pa_assert(m);
1607     pa_assert(c);
1608     pa_assert(t);
1609
1610     if (pa_tagstruct_getu32(t, &command) < 0)
1611     goto fail;
1612
1613     reply = pa_tagstruct_new(NULL, 0);
1614     pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1615     pa_tagstruct_putu32(reply, tag);
1616
1617     switch (command) {
1618         case AEC_SET_VOLUME: {
1619             pa_tagstruct_getu32(t,&value);
1620             pa_log_debug("AEC_SET_VOLUME in echo cancel = %d",value);
1621         break;
1622     }
1623         case AEC_SET_DEVICE: {
1624             pa_tagstruct_getu32(t,&value);
1625             pa_log_debug("AEC_SET_DEVICE in echo cancel = %d",value);
1626         break;
1627     }
1628     default:
1629         goto fail;
1630     }
1631     pa_pstream_send_tagstruct(pa_native_connection_get_pstream(c), reply);
1632     return 0;
1633
1634 fail:
1635     return -1;
1636 }
1637
1638 /* Common initialisation bits between module-echo-cancel and the standalone
1639  * test program.
1640  *
1641  * Called from main context. */
1642 static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *source_ss, pa_channel_map *source_map) {
1643     const char *ec_string;
1644     pa_echo_canceller_method_t ec_method;
1645
1646     if (pa_modargs_get_sample_spec_and_channel_map(ma, source_ss, source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1647         pa_log("Invalid sample format specification or channel map");
1648         goto fail;
1649     }
1650
1651     u->ec = pa_xnew0(pa_echo_canceller, 1);
1652     if (!u->ec) {
1653         pa_log("Failed to alloc echo canceller");
1654         goto fail;
1655     }
1656
1657     ec_string = pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER);
1658     if ((ec_method = get_ec_method_from_string(ec_string)) < 0) {
1659         pa_log("Invalid echo canceller implementation '%s'", ec_string);
1660         goto fail;
1661     }
1662
1663     pa_log_info("Using AEC engine: %s", ec_string);
1664
1665     u->ec->init = ec_table[ec_method].init;
1666     u->ec->play = ec_table[ec_method].play;
1667     u->ec->record = ec_table[ec_method].record;
1668     u->ec->set_drift = ec_table[ec_method].set_drift;
1669     u->ec->run = ec_table[ec_method].run;
1670     u->ec->done = ec_table[ec_method].done;
1671
1672     return 0;
1673
1674 fail:
1675     return -1;
1676 }
1677
1678 /* Called from main context. */
1679 int pa__init(pa_module*m) {
1680     struct userdata *u;
1681     pa_sample_spec source_output_ss, source_ss, sink_ss;
1682     pa_channel_map source_output_map, source_map, sink_map;
1683     pa_modargs *ma;
1684     pa_source *source_master=NULL;
1685     pa_sink *sink_master=NULL;
1686     pa_source_output_new_data source_output_data;
1687     pa_sink_input_new_data sink_input_data;
1688     pa_source_new_data source_data;
1689     pa_sink_new_data sink_data;
1690     pa_memchunk silence;
1691     uint32_t temp;
1692     uint32_t nframes = 0;
1693
1694     pa_assert(m);
1695
1696     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1697         pa_log("Failed to parse module arguments.");
1698         goto fail;
1699     }
1700
1701     if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1702         pa_log("Master source not found");
1703         goto fail;
1704     }
1705     pa_assert(source_master);
1706
1707     if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1708         pa_log("Master sink not found");
1709         goto fail;
1710     }
1711     pa_assert(sink_master);
1712
1713     if (source_master->monitor_of == sink_master) {
1714         pa_log("Can't cancel echo between a sink and its monitor");
1715         goto fail;
1716     }
1717
1718     source_ss = source_master->sample_spec;
1719     source_ss.rate = DEFAULT_RATE;
1720     source_ss.channels = DEFAULT_CHANNELS;
1721     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1722
1723     sink_ss = sink_master->sample_spec;
1724     sink_map = sink_master->channel_map;
1725
1726     u = pa_xnew0(struct userdata, 1);
1727     if (!u) {
1728         pa_log("Failed to alloc userdata");
1729         goto fail;
1730     }
1731     u->core = m->core;
1732     u->module = m;
1733     m->userdata = u;
1734     u->dead = false;
1735
1736     u->use_volume_sharing = true;
1737     if (pa_modargs_get_value_boolean(ma, "use_volume_sharing", &u->use_volume_sharing) < 0) {
1738         pa_log("use_volume_sharing= expects a boolean argument");
1739         goto fail;
1740     }
1741
1742     temp = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1743     if (pa_modargs_get_value_u32(ma, "adjust_time", &temp) < 0) {
1744         pa_log("Failed to parse adjust_time value");
1745         goto fail;
1746     }
1747
1748     if (temp != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1749         u->adjust_time = temp * PA_USEC_PER_SEC;
1750     else
1751         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1752
1753     temp = DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC;
1754     if (pa_modargs_get_value_u32(ma, "adjust_threshold", &temp) < 0) {
1755         pa_log("Failed to parse adjust_threshold value");
1756         goto fail;
1757     }
1758
1759     if (temp != DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC)
1760         u->adjust_threshold = temp * PA_USEC_PER_MSEC;
1761     else
1762         u->adjust_threshold = DEFAULT_ADJUST_TOLERANCE;
1763
1764     u->save_aec = DEFAULT_SAVE_AEC;
1765     if (pa_modargs_get_value_boolean(ma, "save_aec", &u->save_aec) < 0) {
1766         pa_log("Failed to parse save_aec value");
1767         goto fail;
1768     }
1769
1770     u->autoloaded = DEFAULT_AUTOLOADED;
1771     if (pa_modargs_get_value_boolean(ma, "autoloaded", &u->autoloaded) < 0) {
1772         pa_log("Failed to parse autoloaded value");
1773         goto fail;
1774     }
1775
1776     if (init_common(ma, u, &source_ss, &source_map) < 0)
1777         goto fail;
1778
1779     u->asyncmsgq = pa_asyncmsgq_new(0);
1780     u->need_realign = true;
1781
1782     source_output_ss = source_ss;
1783     source_output_map = source_map;
1784
1785     if (sink_ss.rate != source_ss.rate) {
1786         pa_log_info("Sample rates of play and out stream differ. Adjusting rate of play stream.");
1787         sink_ss.rate = source_ss.rate;
1788     }
1789
1790     pa_assert(u->ec->init);
1791     if (!u->ec->init(u->core, u->ec, &source_output_ss, &source_output_map, &sink_ss, &sink_map, &source_ss, &source_map, &nframes, pa_modargs_get_value(ma, "aec_args", NULL))) {
1792         pa_log("Failed to init AEC engine");
1793         goto fail;
1794     }
1795
1796     pa_assert(source_output_ss.rate == source_ss.rate);
1797     pa_assert(sink_ss.rate == source_ss.rate);
1798
1799     u->source_output_blocksize = nframes * pa_frame_size(&source_output_ss);
1800     u->source_blocksize = nframes * pa_frame_size(&source_ss);
1801     u->sink_blocksize = nframes * pa_frame_size(&sink_ss);
1802
1803     if (u->ec->params.drift_compensation)
1804         pa_assert(u->ec->set_drift);
1805
1806     /* Create source */
1807     pa_source_new_data_init(&source_data);
1808     source_data.driver = __FILE__;
1809     source_data.module = m;
1810     if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1811         source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1812     pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1813     pa_source_new_data_set_channel_map(&source_data, &source_map);
1814     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1815     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1816     if (!u->autoloaded)
1817         pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1818
1819     if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1820         pa_log("Invalid properties");
1821         pa_source_new_data_done(&source_data);
1822         goto fail;
1823     }
1824
1825     if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1826         const char *y, *z;
1827
1828         y = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1829         z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1830         pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1831                 z ? z : source_master->name, y ? y : sink_master->name);
1832     }
1833
1834     u->source = pa_source_new(m->core, &source_data, (source_master->flags & (PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY))
1835                                                      | (u->use_volume_sharing ? PA_SOURCE_SHARE_VOLUME_WITH_MASTER : 0));
1836     pa_source_new_data_done(&source_data);
1837
1838     if (!u->source) {
1839         pa_log("Failed to create source.");
1840         goto fail;
1841     }
1842
1843     u->source->parent.process_msg = source_process_msg_cb;
1844     u->source->set_state = source_set_state_cb;
1845     u->source->update_requested_latency = source_update_requested_latency_cb;
1846     pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
1847     if (!u->use_volume_sharing) {
1848         pa_source_set_get_volume_callback(u->source, source_get_volume_cb);
1849         pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
1850         pa_source_enable_decibel_volume(u->source, true);
1851     }
1852     u->source->userdata = u;
1853
1854     pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1855
1856     /* Create sink */
1857     pa_sink_new_data_init(&sink_data);
1858     sink_data.driver = __FILE__;
1859     sink_data.module = m;
1860     if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1861         sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1862     pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1863     pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1864     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1865     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1866     if (!u->autoloaded)
1867         pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1868
1869     if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1870         pa_log("Invalid properties");
1871         pa_sink_new_data_done(&sink_data);
1872         goto fail;
1873     }
1874
1875     if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1876         const char *y, *z;
1877
1878         y = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1879         z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1880         pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1881                 z ? z : sink_master->name, y ? y : source_master->name);
1882     }
1883
1884     u->sink = pa_sink_new(m->core, &sink_data, (sink_master->flags & (PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY))
1885                                                | (u->use_volume_sharing ? PA_SINK_SHARE_VOLUME_WITH_MASTER : 0));
1886     pa_sink_new_data_done(&sink_data);
1887
1888     if (!u->sink) {
1889         pa_log("Failed to create sink.");
1890         goto fail;
1891     }
1892
1893     u->sink->parent.process_msg = sink_process_msg_cb;
1894     u->sink->set_state = sink_set_state_cb;
1895     u->sink->update_requested_latency = sink_update_requested_latency_cb;
1896     u->sink->request_rewind = sink_request_rewind_cb;
1897     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
1898     if (!u->use_volume_sharing) {
1899         pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
1900         pa_sink_enable_decibel_volume(u->sink, true);
1901     }
1902     u->sink->userdata = u;
1903
1904     pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1905
1906     /* Create source output */
1907     pa_source_output_new_data_init(&source_output_data);
1908     source_output_data.driver = __FILE__;
1909     source_output_data.module = m;
1910     pa_source_output_new_data_set_source(&source_output_data, source_master, false);
1911     source_output_data.destination_source = u->source;
1912
1913     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1914     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1915     pa_source_output_new_data_set_sample_spec(&source_output_data, &source_output_ss);
1916     pa_source_output_new_data_set_channel_map(&source_output_data, &source_output_map);
1917
1918     pa_source_output_new(&u->source_output, m->core, &source_output_data);
1919     pa_source_output_new_data_done(&source_output_data);
1920
1921     if (!u->source_output)
1922         goto fail;
1923
1924     u->source_output->parent.process_msg = source_output_process_msg_cb;
1925     u->source_output->push = source_output_push_cb;
1926     u->source_output->process_rewind = source_output_process_rewind_cb;
1927     u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1928     u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1929     u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1930     u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1931     u->source_output->kill = source_output_kill_cb;
1932     u->source_output->attach = source_output_attach_cb;
1933     u->source_output->detach = source_output_detach_cb;
1934     u->source_output->state_change = source_output_state_change_cb;
1935     u->source_output->may_move_to = source_output_may_move_to_cb;
1936     u->source_output->moving = source_output_moving_cb;
1937     u->source_output->userdata = u;
1938
1939     u->source->output_from_master = u->source_output;
1940
1941     /* Create sink input */
1942     pa_sink_input_new_data_init(&sink_input_data);
1943     sink_input_data.driver = __FILE__;
1944     sink_input_data.module = m;
1945     pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, false);
1946     sink_input_data.origin_sink = u->sink;
1947     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
1948     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1949     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
1950     pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
1951     sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
1952
1953     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1954     pa_sink_input_new_data_done(&sink_input_data);
1955
1956     if (!u->sink_input)
1957         goto fail;
1958
1959     u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1960     u->sink_input->pop = sink_input_pop_cb;
1961     u->sink_input->process_rewind = sink_input_process_rewind_cb;
1962     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1963     u->sink_input->update_max_request = sink_input_update_max_request_cb;
1964     u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
1965     u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1966     u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
1967     u->sink_input->kill = sink_input_kill_cb;
1968     u->sink_input->attach = sink_input_attach_cb;
1969     u->sink_input->detach = sink_input_detach_cb;
1970     u->sink_input->state_change = sink_input_state_change_cb;
1971     u->sink_input->may_move_to = sink_input_may_move_to_cb;
1972     u->sink_input->moving = sink_input_moving_cb;
1973     if (!u->use_volume_sharing)
1974         u->sink_input->volume_changed = sink_input_volume_changed_cb;
1975     u->sink_input->mute_changed = sink_input_mute_changed_cb;
1976     u->sink_input->userdata = u;
1977
1978     u->sink->input_to_master = u->sink_input;
1979
1980     pa_sink_input_get_silence(u->sink_input, &silence);
1981
1982     u->source_memblockq = pa_memblockq_new("module-echo-cancel source_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1983         &source_output_ss, 1, 1, 0, &silence);
1984     u->sink_memblockq = pa_memblockq_new("module-echo-cancel sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1985         &sink_ss, 0, 1, 0, &silence);
1986
1987     pa_memblock_unref(silence.memblock);
1988
1989     if (!u->source_memblockq || !u->sink_memblockq) {
1990         pa_log("Failed to create memblockq.");
1991         goto fail;
1992     }
1993
1994     if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
1995         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1996     else if (u->ec->params.drift_compensation) {
1997         pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
1998         u->adjust_time = 0;
1999         /* Perform resync just once to give the canceller a leg up */
2000         pa_atomic_store(&u->request_resync, 1);
2001     }
2002
2003     if (u->save_aec) {
2004         pa_log("Creating AEC files in /tmp");
2005         u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
2006         if (u->captured_file == NULL)
2007             perror ("fopen failed");
2008         u->played_file = fopen("/tmp/aec_play.sw", "wb");
2009         if (u->played_file == NULL)
2010             perror ("fopen failed");
2011         u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
2012         if (u->canceled_file == NULL)
2013             perror ("fopen failed");
2014         if (u->ec->params.drift_compensation) {
2015             u->drift_file = fopen("/tmp/aec_drift.txt", "w");
2016             if (u->drift_file == NULL)
2017                 perror ("fopen failed");
2018         }
2019     }
2020
2021     u->ec->msg = pa_msgobject_new(pa_echo_canceller_msg);
2022     u->ec->msg->parent.process_msg = canceller_process_msg_cb;
2023     u->ec->msg->userdata = u;
2024
2025     u->thread_info.current_volume = u->source->reference_volume;
2026
2027     u->protocol = pa_native_protocol_get(m->core);
2028     pa_native_protocol_install_ext(u->protocol, m, extension_cb);
2029
2030     pa_sink_put(u->sink);
2031     pa_source_put(u->source);
2032
2033     pa_sink_input_put(u->sink_input);
2034     pa_source_output_put(u->source_output);
2035     pa_modargs_free(ma);
2036
2037     return 0;
2038
2039 fail:
2040     if (ma)
2041         pa_modargs_free(ma);
2042
2043     pa__done(m);
2044
2045     return -1;
2046 }
2047
2048 /* Called from main context. */
2049 int pa__get_n_used(pa_module *m) {
2050     struct userdata *u;
2051
2052     pa_assert(m);
2053     pa_assert_se(u = m->userdata);
2054
2055     return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
2056 }
2057
2058 /* Called from main context. */
2059 void pa__done(pa_module*m) {
2060     struct userdata *u;
2061
2062     pa_assert(m);
2063
2064     if (!(u = m->userdata))
2065         return;
2066
2067     u->dead = true;
2068
2069     /* See comments in source_output_kill_cb() above regarding
2070      * destruction order! */
2071
2072     if (u->time_event)
2073         u->core->mainloop->time_free(u->time_event);
2074
2075     if (u->source_output)
2076         pa_source_output_unlink(u->source_output);
2077     if (u->sink_input)
2078         pa_sink_input_unlink(u->sink_input);
2079
2080     if (u->source)
2081         pa_source_unlink(u->source);
2082     if (u->sink)
2083         pa_sink_unlink(u->sink);
2084
2085     if (u->source_output)
2086         pa_source_output_unref(u->source_output);
2087     if (u->sink_input)
2088         pa_sink_input_unref(u->sink_input);
2089
2090     if (u->source)
2091         pa_source_unref(u->source);
2092     if (u->sink)
2093         pa_sink_unref(u->sink);
2094
2095     if (u->source_memblockq)
2096         pa_memblockq_free(u->source_memblockq);
2097     if (u->sink_memblockq)
2098         pa_memblockq_free(u->sink_memblockq);
2099
2100     if (u->ec) {
2101         if (u->ec->done)
2102             u->ec->done(u->ec);
2103
2104         pa_xfree(u->ec);
2105     }
2106
2107     if (u->protocol) {
2108         pa_native_protocol_remove_ext(u->protocol, m);
2109         pa_native_protocol_unref(u->protocol);
2110     }
2111
2112     if (u->asyncmsgq)
2113         pa_asyncmsgq_unref(u->asyncmsgq);
2114
2115     if (u->save_aec) {
2116         if (u->played_file)
2117             fclose(u->played_file);
2118         if (u->captured_file)
2119             fclose(u->captured_file);
2120         if (u->canceled_file)
2121             fclose(u->canceled_file);
2122         if (u->drift_file)
2123             fclose(u->drift_file);
2124     }
2125
2126     pa_xfree(u);
2127 }
2128
2129 #ifdef ECHO_CANCEL_TEST
2130 /*
2131  * Stand-alone test program for running in the canceller on pre-recorded files.
2132  */
2133 int main(int argc, char* argv[]) {
2134     struct userdata u;
2135     pa_sample_spec source_output_ss, source_ss, sink_ss;
2136     pa_channel_map source_output_map, source_map, sink_map;
2137     pa_modargs *ma = NULL;
2138     uint8_t *rdata = NULL, *pdata = NULL, *cdata = NULL;
2139     int unused PA_GCC_UNUSED;
2140     int ret = 0, i;
2141     char c;
2142     float drift;
2143     uint32_t nframes;
2144
2145     if (!getenv("MAKE_CHECK"))
2146         pa_log_set_level(PA_LOG_DEBUG);
2147
2148     pa_memzero(&u, sizeof(u));
2149
2150     if (argc < 4 || argc > 7) {
2151         goto usage;
2152     }
2153
2154     u.captured_file = fopen(argv[2], "rb");
2155     if (u.captured_file == NULL) {
2156         perror ("Could not open capture file");
2157         goto fail;
2158     }
2159     u.played_file = fopen(argv[1], "rb");
2160     if (u.played_file == NULL) {
2161         perror ("Could not open play file");
2162         goto fail;
2163     }
2164     u.canceled_file = fopen(argv[3], "wb");
2165     if (u.canceled_file == NULL) {
2166         perror ("Could not open canceled file");
2167         goto fail;
2168     }
2169
2170     u.core = pa_xnew0(pa_core, 1);
2171     u.core->cpu_info.cpu_type = PA_CPU_X86;
2172     u.core->cpu_info.flags.x86 |= PA_CPU_X86_SSE;
2173
2174     if (!(ma = pa_modargs_new(argc > 4 ? argv[4] : NULL, valid_modargs))) {
2175         pa_log("Failed to parse module arguments.");
2176         goto fail;
2177     }
2178
2179     source_ss.format = PA_SAMPLE_S16LE;
2180     source_ss.rate = DEFAULT_RATE;
2181     source_ss.channels = DEFAULT_CHANNELS;
2182     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2183
2184     sink_ss.format = PA_SAMPLE_S16LE;
2185     sink_ss.rate = DEFAULT_RATE;
2186     sink_ss.channels = DEFAULT_CHANNELS;
2187     pa_channel_map_init_auto(&sink_map, sink_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2188
2189     if (init_common(ma, &u, &source_ss, &source_map) < 0)
2190         goto fail;
2191
2192     source_output_ss = source_ss;
2193     source_output_map = source_map;
2194
2195     if (!u.ec->init(u.core, u.ec, &source_output_ss, &source_output_map, &sink_ss, &sink_map, &source_ss, &source_map, &nframes,
2196                      pa_modargs_get_value(ma, "aec_args", NULL))) {
2197         pa_log("Failed to init AEC engine");
2198         goto fail;
2199     }
2200     u.source_output_blocksize = nframes * pa_frame_size(&source_output_ss);
2201     u.source_blocksize = nframes * pa_frame_size(&source_ss);
2202     u.sink_blocksize = nframes * pa_frame_size(&sink_ss);
2203
2204     if (u.ec->params.drift_compensation) {
2205         if (argc < 6) {
2206             pa_log("Drift compensation enabled but drift file not specified");
2207             goto fail;
2208         }
2209
2210         u.drift_file = fopen(argv[5], "rt");
2211
2212         if (u.drift_file == NULL) {
2213             perror ("Could not open drift file");
2214             goto fail;
2215         }
2216     }
2217
2218     rdata = pa_xmalloc(u.source_output_blocksize);
2219     pdata = pa_xmalloc(u.sink_blocksize);
2220     cdata = pa_xmalloc(u.source_blocksize);
2221
2222     if (!u.ec->params.drift_compensation) {
2223         while (fread(rdata, u.source_output_blocksize, 1, u.captured_file) > 0) {
2224             if (fread(pdata, u.sink_blocksize, 1, u.played_file) == 0) {
2225                 perror("Played file ended before captured file");
2226                 goto fail;
2227             }
2228
2229             u.ec->run(u.ec, rdata, pdata, cdata);
2230
2231             unused = fwrite(cdata, u.source_blocksize, 1, u.canceled_file);
2232         }
2233     } else {
2234         while (fscanf(u.drift_file, "%c", &c) > 0) {
2235             switch (c) {
2236                 case 'd':
2237                     if (!fscanf(u.drift_file, "%a", &drift)) {
2238                         perror("Drift file incomplete");
2239                         goto fail;
2240                     }
2241
2242                     u.ec->set_drift(u.ec, drift);
2243
2244                     break;
2245
2246                 case 'c':
2247                     if (!fscanf(u.drift_file, "%d", &i)) {
2248                         perror("Drift file incomplete");
2249                         goto fail;
2250                     }
2251
2252                     if (fread(rdata, i, 1, u.captured_file) <= 0) {
2253                         perror("Captured file ended prematurely");
2254                         goto fail;
2255                     }
2256
2257                     u.ec->record(u.ec, rdata, cdata);
2258
2259                     unused = fwrite(cdata, i, 1, u.canceled_file);
2260
2261                     break;
2262
2263                 case 'p':
2264                     if (!fscanf(u.drift_file, "%d", &i)) {
2265                         perror("Drift file incomplete");
2266                         goto fail;
2267                     }
2268
2269                     if (fread(pdata, i, 1, u.played_file) <= 0) {
2270                         perror("Played file ended prematurely");
2271                         goto fail;
2272                     }
2273
2274                     u.ec->play(u.ec, pdata);
2275
2276                     break;
2277             }
2278         }
2279
2280         if (fread(rdata, i, 1, u.captured_file) > 0)
2281             pa_log("All capture data was not consumed");
2282         if (fread(pdata, i, 1, u.played_file) > 0)
2283             pa_log("All playback data was not consumed");
2284     }
2285
2286     u.ec->done(u.ec);
2287
2288 out:
2289     if (u.captured_file)
2290         fclose(u.captured_file);
2291     if (u.played_file)
2292         fclose(u.played_file);
2293     if (u.canceled_file)
2294         fclose(u.canceled_file);
2295     if (u.drift_file)
2296         fclose(u.drift_file);
2297
2298     pa_xfree(rdata);
2299     pa_xfree(pdata);
2300     pa_xfree(cdata);
2301
2302     pa_xfree(u.ec);
2303     pa_xfree(u.core);
2304
2305     if (ma)
2306         pa_modargs_free(ma);
2307
2308     return ret;
2309
2310 usage:
2311     pa_log("Usage: %s play_file rec_file out_file [module args] [drift_file]", argv[0]);
2312
2313 fail:
2314     ret = -1;
2315     goto out;
2316 }
2317 #endif /* ECHO_CANCEL_TEST */