Enhance for echo cancel - samsung
[platform/upstream/pulseaudio.git] / src / modules / echo-cancel / module-echo-cancel.c
1 /***
2     This file is part of PulseAudio.
3
4     Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6     Based on module-virtual-sink.c
7              module-virtual-source.c
8              module-loopback.c
9
10         Copyright 2010 Intel Corporation
11         Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13     PulseAudio is free software; you can redistribute it and/or modify
14     it under the terms of the GNU Lesser General Public License as published
15     by the Free Software Foundation; either version 2.1 of the License,
16     or (at your option) any later version.
17
18     PulseAudio is distributed in the hope that it will be useful, but
19     WITHOUT ANY WARRANTY; without even the implied warranty of
20     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21     General Public License for more details.
22
23     You should have received a copy of the GNU Lesser General Public License
24     along with PulseAudio; if not, write to the Free Software
25     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26     USA.
27 ***/
28
29 #ifdef HAVE_CONFIG_H
30 #include <config.h>
31 #endif
32
33 #include <stdio.h>
34 #include <math.h>
35
36 #include "echo-cancel.h"
37
38 #include <pulse/xmalloc.h>
39 #include <pulse/timeval.h>
40 #include <pulse/rtclock.h>
41
42 #include <pulsecore/i18n.h>
43 #include <pulsecore/atomic.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/namereg.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-rtclock.h>
49 #include <pulsecore/core-util.h>
50 #include <pulsecore/modargs.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/rtpoll.h>
53 #include <pulsecore/sample-util.h>
54 #include <pulsecore/ltdl-helper.h>
55
56 #include <pulsecore/protocol-native.h>
57 #include <pulsecore/pstream-util.h>
58
59 #include "module-echo-cancel-symdef.h"
60
61 PA_MODULE_AUTHOR("Wim Taymans");
62 PA_MODULE_DESCRIPTION("Echo Cancellation");
63 PA_MODULE_VERSION(PACKAGE_VERSION);
64 PA_MODULE_LOAD_ONCE(false);
65 PA_MODULE_USAGE(
66         _("source_name=<name for the source> "
67           "source_properties=<properties for the source> "
68           "source_master=<name of source to filter> "
69           "sink_name=<name for the sink> "
70           "sink_properties=<properties for the sink> "
71           "sink_master=<name of sink to filter> "
72           "adjust_time=<how often to readjust rates in s> "
73           "adjust_threshold=<how much drift to readjust after in ms> "
74           "format=<sample format> "
75           "rate=<sample rate> "
76           "channels=<number of channels> "
77           "channel_map=<channel map> "
78           "aec_method=<implementation to use> "
79           "aec_args=<parameters for the AEC engine> "
80           "save_aec=<save AEC data in /tmp> "
81           "autoloaded=<set if this module is being loaded automatically> "
82           "use_volume_sharing=<yes or no> "
83         ));
84
85 /* NOTE: Make sure the enum and ec_table are maintained in the correct order */
86 typedef enum {
87     PA_ECHO_CANCELLER_INVALID = -1,
88     PA_ECHO_CANCELLER_NULL,
89 #ifdef HAVE_SPEEX
90     PA_ECHO_CANCELLER_SPEEX,
91 #endif
92 #ifdef HAVE_ADRIAN_EC
93     PA_ECHO_CANCELLER_ADRIAN,
94 #endif
95 #ifdef HAVE_WEBRTC
96     PA_ECHO_CANCELLER_WEBRTC,
97 #endif
98 } pa_echo_canceller_method_t;
99
100 enum {
101         AEC_SET_VOLUME,
102         AEC_SET_DEVICE,
103 };
104
105 #ifdef HAVE_WEBRTC
106 #define DEFAULT_ECHO_CANCELLER "webrtc"
107 #else
108 #define DEFAULT_ECHO_CANCELLER "speex"
109 #endif
110
111 static const pa_echo_canceller ec_table[] = {
112     {
113         /* Null, Dummy echo canceller (just copies data) */
114         .init                   = pa_null_ec_init,
115         .run                    = pa_null_ec_run,
116         .done                   = pa_null_ec_done,
117     },
118 #ifdef HAVE_SPEEX
119     {
120         /* Speex */
121         .init                   = pa_speex_ec_init,
122         .run                    = pa_speex_ec_run,
123         .done                   = pa_speex_ec_done,
124     },
125 #endif
126 #ifdef HAVE_ADRIAN_EC
127     {
128         /* Adrian Andre's NLMS implementation */
129         .init                   = pa_adrian_ec_init,
130         .run                    = pa_adrian_ec_run,
131         .done                   = pa_adrian_ec_done,
132     },
133 #endif
134 #ifdef HAVE_WEBRTC
135     {
136         /* WebRTC's audio processing engine */
137         .init                   = pa_webrtc_ec_init,
138         .play                   = pa_webrtc_ec_play,
139         .record                 = pa_webrtc_ec_record,
140         .set_drift              = pa_webrtc_ec_set_drift,
141         .run                    = pa_webrtc_ec_run,
142         .done                   = pa_webrtc_ec_done,
143     },
144 #endif
145 };
146
147 #define DEFAULT_RATE 32000
148 #define DEFAULT_CHANNELS 1
149 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
150 #define DEFAULT_ADJUST_TOLERANCE (5*PA_USEC_PER_MSEC)
151 #define DEFAULT_SAVE_AEC false
152 #define DEFAULT_AUTOLOADED false
153
154 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
155
156 /* Can only be used in main context */
157 #define IS_ACTIVE(u) ((pa_source_get_state((u)->source) == PA_SOURCE_RUNNING) && \
158                       (pa_sink_get_state((u)->sink) == PA_SINK_RUNNING))
159
160 /* This module creates a new (virtual) source and sink.
161  *
162  * The data sent to the new sink is kept in a memblockq before being
163  * forwarded to the real sink_master.
164  *
165  * Data read from source_master is matched against the saved sink data and
166  * echo canceled data is then pushed onto the new source.
167  *
168  * Both source and sink masters have their own threads to push/pull data
169  * respectively. We however perform all our actions in the source IO thread.
170  * To do this we send all played samples to the source IO thread where they
171  * are then pushed into the memblockq.
172  *
173  * Alignment is performed in two steps:
174  *
175  * 1) when something happens that requires quick adjustment of the alignment of
176  *    capture and playback samples, we perform a resync. This adjusts the
177  *    position in the playback memblock to the requested sample. Quick
178  *    adjustments include moving the playback samples before the capture
179  *    samples (because else the echo canceler does not work) or when the
180  *    playback pointer drifts too far away.
181  *
182  * 2) periodically check the difference between capture and playback. We use a
183  *    low and high watermark for adjusting the alignment. Playback should always
184  *    be before capture and the difference should not be bigger than one frame
185  *    size. We would ideally like to resample the sink_input but most driver
186  *    don't give enough accuracy to be able to do that right now.
187  */
188
189 struct userdata;
190
191 struct pa_echo_canceller_msg {
192     pa_msgobject parent;
193     struct userdata *userdata;
194 };
195
196 PA_DEFINE_PRIVATE_CLASS(pa_echo_canceller_msg, pa_msgobject);
197 #define PA_ECHO_CANCELLER_MSG(o) (pa_echo_canceller_msg_cast(o))
198
199 struct snapshot {
200     pa_usec_t sink_now;
201     pa_usec_t sink_latency;
202     size_t sink_delay;
203     int64_t send_counter;
204
205     pa_usec_t source_now;
206     pa_usec_t source_latency;
207     size_t source_delay;
208     int64_t recv_counter;
209     size_t rlen;
210     size_t plen;
211 };
212
213 struct userdata {
214     pa_core *core;
215     pa_module *module;
216
217     bool autoloaded;
218     bool dead;
219     bool save_aec;
220
221     pa_echo_canceller *ec;
222     uint32_t source_output_blocksize;
223     uint32_t source_blocksize;
224     uint32_t sink_blocksize;
225
226     bool need_realign;
227
228     /* to wakeup the source I/O thread */
229     pa_asyncmsgq *asyncmsgq;
230     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
231
232     pa_source *source;
233     bool source_auto_desc;
234     pa_source_output *source_output;
235     pa_memblockq *source_memblockq; /* echo canceler needs fixed sized chunks */
236     size_t source_skip;
237
238     pa_sink *sink;
239     bool sink_auto_desc;
240     pa_sink_input *sink_input;
241     pa_memblockq *sink_memblockq;
242     int64_t send_counter;          /* updated in sink IO thread */
243     int64_t recv_counter;
244     size_t sink_skip;
245
246     /* Bytes left over from previous iteration */
247     size_t sink_rem;
248     size_t source_rem;
249
250     pa_atomic_t request_resync;
251
252     pa_time_event *time_event;
253     pa_usec_t adjust_time;
254     int adjust_threshold;
255
256     FILE *captured_file;
257     FILE *played_file;
258     FILE *canceled_file;
259     FILE *drift_file;
260
261     bool use_volume_sharing;
262
263     struct {
264         pa_cvolume current_volume;
265     } thread_info;
266
267     pa_native_protocol *protocol;
268 };
269
270 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
271
272 static const char* const valid_modargs[] = {
273     "source_name",
274     "source_properties",
275     "source_master",
276     "sink_name",
277     "sink_properties",
278     "sink_master",
279     "adjust_time",
280     "adjust_threshold",
281     "format",
282     "rate",
283     "channels",
284     "channel_map",
285     "aec_method",
286     "aec_args",
287     "save_aec",
288     "autoloaded",
289     "use_volume_sharing",
290     NULL
291 };
292
293 enum {
294     SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
295     SOURCE_OUTPUT_MESSAGE_REWIND,
296     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
297     SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
298 };
299
300 enum {
301     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
302 };
303
304 enum {
305     ECHO_CANCELLER_MESSAGE_SET_VOLUME,
306 };
307
308 static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
309     int64_t diff_time, buffer_latency;
310     pa_usec_t plen, rlen, source_delay, sink_delay, recv_counter, send_counter;
311
312     /* get latency difference between playback and record */
313     plen = pa_bytes_to_usec(snapshot->plen, &u->sink_input->sample_spec);
314     rlen = pa_bytes_to_usec(snapshot->rlen, &u->source_output->sample_spec);
315     if (plen > rlen)
316         buffer_latency = plen - rlen;
317     else
318         buffer_latency = 0;
319
320     source_delay = pa_bytes_to_usec(snapshot->source_delay, &u->source_output->sample_spec);
321     sink_delay = pa_bytes_to_usec(snapshot->sink_delay, &u->sink_input->sample_spec);
322     buffer_latency += source_delay + sink_delay;
323
324     /* add the latency difference due to samples not yet transferred */
325     send_counter = pa_bytes_to_usec(snapshot->send_counter, &u->sink->sample_spec);
326     recv_counter = pa_bytes_to_usec(snapshot->recv_counter, &u->sink->sample_spec);
327     if (recv_counter <= send_counter)
328         buffer_latency += (int64_t) (send_counter - recv_counter);
329     else
330         buffer_latency += PA_CLIP_SUB(buffer_latency, (int64_t) (recv_counter - send_counter));
331
332     /* capture and playback are perfectly aligned when diff_time is 0 */
333     diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
334           (snapshot->source_now - snapshot->source_latency);
335
336     pa_log_debug("Diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
337         (long long) snapshot->sink_latency,
338         (long long) buffer_latency, (long long) snapshot->source_latency,
339         (long long) source_delay, (long long) sink_delay,
340         (long long) (send_counter - recv_counter),
341         (long long) (snapshot->sink_now - snapshot->source_now));
342
343     return diff_time;
344 }
345
346 /* Called from main context */
347 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
348     struct userdata *u = userdata;
349     uint32_t old_rate, base_rate, new_rate;
350     int64_t diff_time;
351     /*size_t fs*/
352     struct snapshot latency_snapshot;
353
354     pa_assert(u);
355     pa_assert(a);
356     pa_assert(u->time_event == e);
357     pa_assert_ctl_context();
358
359     if (!IS_ACTIVE(u))
360         return;
361
362     /* update our snapshots */
363     pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
364     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
365
366     /* calculate drift between capture and playback */
367     diff_time = calc_diff(u, &latency_snapshot);
368
369     /*fs = pa_frame_size(&u->source_output->sample_spec);*/
370     old_rate = u->sink_input->sample_spec.rate;
371     base_rate = u->source_output->sample_spec.rate;
372
373     if (diff_time < 0) {
374         /* recording before playback, we need to adjust quickly. The echo
375          * canceler does not work in this case. */
376         pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
377             NULL, diff_time, NULL, NULL);
378         /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
379         new_rate = base_rate;
380     }
381     else {
382         if (diff_time > u->adjust_threshold) {
383             /* diff too big, quickly adjust */
384             pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
385                 NULL, diff_time, NULL, NULL);
386         }
387
388         /* recording behind playback, we need to slowly adjust the rate to match */
389         /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
390
391         /* assume equal samplerates for now */
392         new_rate = base_rate;
393     }
394
395     /* make sure we don't make too big adjustments because that sounds horrible */
396     if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
397         new_rate = base_rate;
398
399     if (new_rate != old_rate) {
400         pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
401
402         pa_sink_input_set_rate(u->sink_input, new_rate);
403     }
404
405     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
406 }
407
408 /* Called from source I/O thread context */
409 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
410     struct userdata *u = PA_SOURCE(o)->userdata;
411
412     switch (code) {
413
414         case PA_SOURCE_MESSAGE_GET_LATENCY:
415
416             /* The source is _put() before the source output is, so let's
417              * make sure we don't access it in that time. Also, the
418              * source output is first shut down, the source second. */
419             if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
420                 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
421                 *((pa_usec_t*) data) = 0;
422                 return 0;
423             }
424
425             *((pa_usec_t*) data) =
426
427                 /* Get the latency of the master source */
428                 pa_source_get_latency_within_thread(u->source_output->source) +
429                 /* Add the latency internal to our source output on top */
430                 pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
431                 /* and the buffering we do on the source */
432                 pa_bytes_to_usec(u->source_output_blocksize, &u->source_output->source->sample_spec);
433
434             return 0;
435
436         case PA_SOURCE_MESSAGE_SET_VOLUME_SYNCED:
437             u->thread_info.current_volume = u->source->reference_volume;
438             break;
439     }
440
441     return pa_source_process_msg(o, code, data, offset, chunk);
442 }
443
444 /* Called from sink I/O thread context */
445 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
446     struct userdata *u = PA_SINK(o)->userdata;
447
448     switch (code) {
449
450         case PA_SINK_MESSAGE_GET_LATENCY:
451
452             /* The sink is _put() before the sink input is, so let's
453              * make sure we don't access it in that time. Also, the
454              * sink input is first shut down, the sink second. */
455             if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
456                 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
457                 *((pa_usec_t*) data) = 0;
458                 return 0;
459             }
460
461             *((pa_usec_t*) data) =
462
463                 /* Get the latency of the master sink */
464                 pa_sink_get_latency_within_thread(u->sink_input->sink) +
465
466                 /* Add the latency internal to our sink input on top */
467                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
468
469             return 0;
470     }
471
472     return pa_sink_process_msg(o, code, data, offset, chunk);
473 }
474
475 /* Called from main context */
476 static int source_set_state_cb(pa_source *s, pa_source_state_t state) {
477     struct userdata *u;
478
479     pa_source_assert_ref(s);
480     pa_assert_se(u = s->userdata);
481
482     if (!PA_SOURCE_IS_LINKED(state) ||
483         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
484         return 0;
485
486     if (state == PA_SOURCE_RUNNING) {
487         /* restart timer when both sink and source are active */
488         if (IS_ACTIVE(u) && u->adjust_time)
489             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
490
491         pa_atomic_store(&u->request_resync, 1);
492         pa_source_output_cork(u->source_output, false);
493     } else if (state == PA_SOURCE_SUSPENDED) {
494         pa_source_output_cork(u->source_output, true);
495     }
496
497     return 0;
498 }
499
500 /* Called from main context */
501 static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state) {
502     struct userdata *u;
503
504     pa_sink_assert_ref(s);
505     pa_assert_se(u = s->userdata);
506
507     if (!PA_SINK_IS_LINKED(state) ||
508         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
509         return 0;
510
511     if (state == PA_SINK_RUNNING) {
512         /* restart timer when both sink and source are active */
513         if (IS_ACTIVE(u) && u->adjust_time)
514             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
515
516         pa_atomic_store(&u->request_resync, 1);
517         pa_sink_input_cork(u->sink_input, false);
518     } else if (state == PA_SINK_SUSPENDED) {
519         pa_sink_input_cork(u->sink_input, true);
520     }
521
522     return 0;
523 }
524
525 /* Called from source I/O thread context */
526 static void source_update_requested_latency_cb(pa_source *s) {
527     struct userdata *u;
528
529     pa_source_assert_ref(s);
530     pa_assert_se(u = s->userdata);
531
532     if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
533         !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
534         return;
535
536     pa_log_debug("Source update requested latency");
537
538     /* Just hand this one over to the master source */
539     pa_source_output_set_requested_latency_within_thread(
540             u->source_output,
541             pa_source_get_requested_latency_within_thread(s));
542 }
543
544 /* Called from sink I/O thread context */
545 static void sink_update_requested_latency_cb(pa_sink *s) {
546     struct userdata *u;
547
548     pa_sink_assert_ref(s);
549     pa_assert_se(u = s->userdata);
550
551     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
552         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
553         return;
554
555     pa_log_debug("Sink update requested latency");
556
557     /* Just hand this one over to the master sink */
558     pa_sink_input_set_requested_latency_within_thread(
559             u->sink_input,
560             pa_sink_get_requested_latency_within_thread(s));
561 }
562
563 /* Called from sink I/O thread context */
564 static void sink_request_rewind_cb(pa_sink *s) {
565     struct userdata *u;
566
567     pa_sink_assert_ref(s);
568     pa_assert_se(u = s->userdata);
569
570     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
571         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
572         return;
573
574     pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
575
576     /* Just hand this one over to the master sink */
577     pa_sink_input_request_rewind(u->sink_input,
578                                  s->thread_info.rewind_nbytes, true, false, false);
579 }
580
581 /* Called from main context */
582 static void source_set_volume_cb(pa_source *s) {
583     struct userdata *u;
584
585     pa_source_assert_ref(s);
586     pa_assert_se(u = s->userdata);
587
588     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
589         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
590         return;
591
592     pa_source_output_set_volume(u->source_output, &s->real_volume, s->save_volume, true);
593 }
594
595 /* Called from main context */
596 static void sink_set_volume_cb(pa_sink *s) {
597     struct userdata *u;
598
599     pa_sink_assert_ref(s);
600     pa_assert_se(u = s->userdata);
601
602     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
603         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
604         return;
605
606     pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, true);
607 }
608
609 /* Called from main context. */
610 static void source_get_volume_cb(pa_source *s) {
611     struct userdata *u;
612     pa_cvolume v;
613
614     pa_source_assert_ref(s);
615     pa_assert_se(u = s->userdata);
616
617     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
618         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
619         return;
620
621     pa_source_output_get_volume(u->source_output, &v, true);
622
623     if (pa_cvolume_equal(&s->real_volume, &v))
624         /* no change */
625         return;
626
627     s->real_volume = v;
628     pa_source_set_soft_volume(s, NULL);
629 }
630
631 /* Called from main context */
632 static void source_set_mute_cb(pa_source *s) {
633     struct userdata *u;
634
635     pa_source_assert_ref(s);
636     pa_assert_se(u = s->userdata);
637
638     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
639         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
640         return;
641
642     pa_source_output_set_mute(u->source_output, s->muted, s->save_muted);
643 }
644
645 /* Called from main context */
646 static void sink_set_mute_cb(pa_sink *s) {
647     struct userdata *u;
648
649     pa_sink_assert_ref(s);
650     pa_assert_se(u = s->userdata);
651
652     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
653         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
654         return;
655
656     pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
657 }
658
659 /* Called from main context */
660 static void source_get_mute_cb(pa_source *s) {
661     struct userdata *u;
662
663     pa_source_assert_ref(s);
664     pa_assert_se(u = s->userdata);
665
666     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
667         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
668         return;
669
670     pa_source_output_get_mute(u->source_output);
671 }
672
673 /* Called from source I/O thread context. */
674 static void apply_diff_time(struct userdata *u, int64_t diff_time) {
675     int64_t diff;
676
677     if (diff_time < 0) {
678         diff = pa_usec_to_bytes(-diff_time, &u->sink_input->sample_spec);
679
680         if (diff > 0) {
681             /* add some extra safety samples to compensate for jitter in the
682              * timings */
683             diff += 10 * pa_frame_size (&u->sink_input->sample_spec);
684
685             pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
686
687             u->sink_skip = diff;
688             u->source_skip = 0;
689         }
690     } else if (diff_time > 0) {
691         diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
692
693         if (diff > 0) {
694             pa_log("Playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
695
696             u->source_skip = diff;
697             u->sink_skip = 0;
698         }
699     }
700 }
701
702 /* Called from source I/O thread context. */
703 static void do_resync(struct userdata *u) {
704     int64_t diff_time;
705     struct snapshot latency_snapshot;
706
707     pa_log("Doing resync");
708
709     /* update our snapshot */
710     source_output_snapshot_within_thread(u, &latency_snapshot);
711     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
712
713     /* calculate drift between capture and playback */
714     diff_time = calc_diff(u, &latency_snapshot);
715
716     /* and adjust for the drift */
717     apply_diff_time(u, diff_time);
718 }
719
720 /* 1. Calculate drift at this point, pass to canceller
721  * 2. Push out playback samples in blocksize chunks
722  * 3. Push out capture samples in blocksize chunks
723  * 4. ???
724  * 5. Profit
725  *
726  * Called from source I/O thread context.
727  */
728 static void do_push_drift_comp(struct userdata *u) {
729     size_t rlen, plen;
730     pa_memchunk rchunk, pchunk, cchunk;
731     uint8_t *rdata, *pdata, *cdata;
732     float drift;
733     int unused PA_GCC_UNUSED;
734
735     rlen = pa_memblockq_get_length(u->source_memblockq);
736     plen = pa_memblockq_get_length(u->sink_memblockq);
737
738     /* Estimate snapshot drift as follows:
739      *   pd: amount of data consumed since last time
740      *   rd: amount of data consumed since last time
741      *
742      *   drift = (pd - rd) / rd;
743      *
744      * We calculate pd and rd as the memblockq length less the number of
745      * samples left from the last iteration (to avoid double counting
746      * those remainder samples.
747      */
748     drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
749     u->sink_rem = plen % u->sink_blocksize;
750     u->source_rem = rlen % u->source_output_blocksize;
751
752     /* Now let the canceller work its drift compensation magic */
753     u->ec->set_drift(u->ec, drift);
754
755     if (u->save_aec) {
756         if (u->drift_file)
757             fprintf(u->drift_file, "d %a\n", drift);
758     }
759
760     /* Send in the playback samples first */
761     while (plen >= u->sink_blocksize) {
762         pa_memblockq_peek_fixed_size(u->sink_memblockq, u->sink_blocksize, &pchunk);
763         pdata = pa_memblock_acquire(pchunk.memblock);
764         pdata += pchunk.index;
765
766         u->ec->play(u->ec, pdata);
767
768         if (u->save_aec) {
769             if (u->drift_file)
770                 fprintf(u->drift_file, "p %d\n", u->sink_blocksize);
771             if (u->played_file)
772                 unused = fwrite(pdata, 1, u->sink_blocksize, u->played_file);
773         }
774
775         pa_memblock_release(pchunk.memblock);
776         pa_memblockq_drop(u->sink_memblockq, u->sink_blocksize);
777         pa_memblock_unref(pchunk.memblock);
778
779         plen -= u->sink_blocksize;
780     }
781
782     /* And now the capture samples */
783     while (rlen >= u->source_output_blocksize) {
784         pa_memblockq_peek_fixed_size(u->source_memblockq, u->source_output_blocksize, &rchunk);
785
786         rdata = pa_memblock_acquire(rchunk.memblock);
787         rdata += rchunk.index;
788
789         cchunk.index = 0;
790         cchunk.length = u->source_output_blocksize;
791         cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
792         cdata = pa_memblock_acquire(cchunk.memblock);
793
794         u->ec->record(u->ec, rdata, cdata);
795
796         if (u->save_aec) {
797             if (u->drift_file)
798                 fprintf(u->drift_file, "c %d\n", u->source_output_blocksize);
799             if (u->captured_file)
800                 unused = fwrite(rdata, 1, u->source_output_blocksize, u->captured_file);
801             if (u->canceled_file)
802                 unused = fwrite(cdata, 1, u->source_output_blocksize, u->canceled_file);
803         }
804
805         pa_memblock_release(cchunk.memblock);
806         pa_memblock_release(rchunk.memblock);
807
808         pa_memblock_unref(rchunk.memblock);
809
810         pa_source_post(u->source, &cchunk);
811         pa_memblock_unref(cchunk.memblock);
812
813         pa_memblockq_drop(u->source_memblockq, u->source_output_blocksize);
814         rlen -= u->source_output_blocksize;
815     }
816 }
817
818 /* This one's simpler than the drift compensation case -- we just iterate over
819  * the capture buffer, and pass the canceller blocksize bytes of playback and
820  * capture data.
821  *
822  * Called from source I/O thread context. */
823 static void do_push(struct userdata *u) {
824     size_t rlen, plen;
825     pa_memchunk rchunk, pchunk, cchunk;
826     uint8_t *rdata, *pdata, *cdata;
827     int unused PA_GCC_UNUSED;
828
829     rlen = pa_memblockq_get_length(u->source_memblockq);
830     plen = pa_memblockq_get_length(u->sink_memblockq);
831
832     while (rlen >= u->source_output_blocksize) {
833
834         /* take fixed blocks from recorded and played samples */
835         pa_memblockq_peek_fixed_size(u->source_memblockq, u->source_output_blocksize, &rchunk);
836         pa_memblockq_peek_fixed_size(u->sink_memblockq, u->sink_blocksize, &pchunk);
837
838         /* we ran out of played data and pchunk has been filled with silence bytes */
839         if (plen < u->sink_blocksize)
840             pa_memblockq_seek(u->sink_memblockq, u->sink_blocksize - plen, PA_SEEK_RELATIVE, true);
841
842         rdata = pa_memblock_acquire(rchunk.memblock);
843         rdata += rchunk.index;
844         pdata = pa_memblock_acquire(pchunk.memblock);
845         pdata += pchunk.index;
846
847         cchunk.index = 0;
848         cchunk.length = u->source_blocksize;
849         cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
850         cdata = pa_memblock_acquire(cchunk.memblock);
851
852         if (u->save_aec) {
853             if (u->captured_file)
854                 unused = fwrite(rdata, 1, u->source_output_blocksize, u->captured_file);
855             if (u->played_file)
856                 unused = fwrite(pdata, 1, u->sink_blocksize, u->played_file);
857         }
858
859         /* perform echo cancellation */
860         u->ec->run(u->ec, rdata, pdata, cdata);
861
862         if (u->save_aec) {
863             if (u->canceled_file)
864                 unused = fwrite(cdata, 1, u->source_blocksize, u->canceled_file);
865         }
866
867         pa_memblock_release(cchunk.memblock);
868         pa_memblock_release(pchunk.memblock);
869         pa_memblock_release(rchunk.memblock);
870
871         /* drop consumed source samples */
872         pa_memblockq_drop(u->source_memblockq, u->source_output_blocksize);
873         pa_memblock_unref(rchunk.memblock);
874         rlen -= u->source_output_blocksize;
875
876         /* drop consumed sink samples */
877         pa_memblockq_drop(u->sink_memblockq, u->sink_blocksize);
878         pa_memblock_unref(pchunk.memblock);
879
880         if (plen >= u->sink_blocksize)
881             plen -= u->sink_blocksize;
882         else
883             plen = 0;
884
885         /* forward the (echo-canceled) data to the virtual source */
886         pa_source_post(u->source, &cchunk);
887         pa_memblock_unref(cchunk.memblock);
888     }
889 }
890
891 /* Called from source I/O thread context. */
892 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
893     struct userdata *u;
894     size_t rlen, plen, to_skip;
895     pa_memchunk rchunk;
896
897     pa_source_output_assert_ref(o);
898     pa_source_output_assert_io_context(o);
899     pa_assert_se(u = o->userdata);
900
901     if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
902         pa_log("Push when no link?");
903         return;
904     }
905
906     if (PA_UNLIKELY(u->source->thread_info.state != PA_SOURCE_RUNNING ||
907                     u->sink->thread_info.state != PA_SINK_RUNNING)) {
908         pa_source_post(u->source, chunk);
909         return;
910     }
911
912     /* handle queued messages, do any message sending of our own */
913     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
914         ;
915
916     pa_memblockq_push_align(u->source_memblockq, chunk);
917
918     rlen = pa_memblockq_get_length(u->source_memblockq);
919     plen = pa_memblockq_get_length(u->sink_memblockq);
920
921     /* Let's not do anything else till we have enough data to process */
922     if (rlen < u->source_output_blocksize)
923         return;
924
925     /* See if we need to drop samples in order to sync */
926     if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
927         do_resync(u);
928     }
929
930     /* Okay, skip cancellation for skipped source samples if needed. */
931     if (PA_UNLIKELY(u->source_skip)) {
932         /* The slightly tricky bit here is that we drop all but modulo
933          * blocksize bytes and then adjust for that last bit on the sink side.
934          * We do this because the source data is coming at a fixed rate, which
935          * means the only way to try to catch up is drop sink samples and let
936          * the canceller cope up with this. */
937         to_skip = rlen >= u->source_skip ? u->source_skip : rlen;
938         to_skip -= to_skip % u->source_output_blocksize;
939
940         if (to_skip) {
941             pa_memblockq_peek_fixed_size(u->source_memblockq, to_skip, &rchunk);
942             pa_source_post(u->source, &rchunk);
943
944             pa_memblock_unref(rchunk.memblock);
945             pa_memblockq_drop(u->source_memblockq, to_skip);
946
947             rlen -= to_skip;
948             u->source_skip -= to_skip;
949         }
950
951         if (rlen && u->source_skip % u->source_output_blocksize) {
952             u->sink_skip += (uint64_t) (u->source_output_blocksize - (u->source_skip % u->source_output_blocksize)) * u->sink_blocksize / u->source_output_blocksize;
953             u->source_skip -= (u->source_skip % u->source_output_blocksize);
954         }
955     }
956
957     /* And for the sink, these samples have been played back already, so we can
958      * just drop them and get on with it. */
959     if (PA_UNLIKELY(u->sink_skip)) {
960         to_skip = plen >= u->sink_skip ? u->sink_skip : plen;
961
962         pa_memblockq_drop(u->sink_memblockq, to_skip);
963
964         plen -= to_skip;
965         u->sink_skip -= to_skip;
966     }
967
968     /* process and push out samples */
969     if (u->ec->params.drift_compensation)
970         do_push_drift_comp(u);
971     else
972         do_push(u);
973 }
974
975 /* Called from sink I/O thread context. */
976 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
977     struct userdata *u;
978
979     pa_sink_input_assert_ref(i);
980     pa_assert(chunk);
981     pa_assert_se(u = i->userdata);
982
983     if (u->sink->thread_info.rewind_requested)
984         pa_sink_process_rewind(u->sink, 0);
985
986     pa_sink_render_full(u->sink, nbytes, chunk);
987
988     if (i->thread_info.underrun_for > 0) {
989         pa_log_debug("Handling end of underrun.");
990         pa_atomic_store(&u->request_resync, 1);
991     }
992
993     /* let source thread handle the chunk. pass the sample count as well so that
994      * the source IO thread can update the right variables. */
995     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
996         NULL, 0, chunk, NULL);
997     u->send_counter += chunk->length;
998
999     return 0;
1000 }
1001
1002 /* Called from source I/O thread context. */
1003 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
1004     struct userdata *u;
1005
1006     pa_source_output_assert_ref(o);
1007     pa_source_output_assert_io_context(o);
1008     pa_assert_se(u = o->userdata);
1009
1010     pa_source_process_rewind(u->source, nbytes);
1011
1012     /* go back on read side, we need to use older sink data for this */
1013     pa_memblockq_rewind(u->sink_memblockq, nbytes);
1014
1015     /* manipulate write index */
1016     pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, true);
1017
1018     pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
1019         (long long) pa_memblockq_get_length (u->source_memblockq));
1020 }
1021
1022 /* Called from sink I/O thread context. */
1023 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1024     struct userdata *u;
1025
1026     pa_sink_input_assert_ref(i);
1027     pa_assert_se(u = i->userdata);
1028
1029     pa_log_debug("Sink process rewind %lld", (long long) nbytes);
1030
1031     pa_sink_process_rewind(u->sink, nbytes);
1032
1033     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
1034     u->send_counter -= nbytes;
1035 }
1036
1037 /* Called from source I/O thread context. */
1038 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
1039     size_t delay, rlen, plen;
1040     pa_usec_t now, latency;
1041
1042     now = pa_rtclock_now();
1043     latency = pa_source_get_latency_within_thread(u->source_output->source);
1044     delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
1045
1046     delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
1047     rlen = pa_memblockq_get_length(u->source_memblockq);
1048     plen = pa_memblockq_get_length(u->sink_memblockq);
1049
1050     snapshot->source_now = now;
1051     snapshot->source_latency = latency;
1052     snapshot->source_delay = delay;
1053     snapshot->recv_counter = u->recv_counter;
1054     snapshot->rlen = rlen + u->sink_skip;
1055     snapshot->plen = plen + u->source_skip;
1056 }
1057
1058 /* Called from source I/O thread context. */
1059 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1060     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
1061
1062     switch (code) {
1063
1064         case SOURCE_OUTPUT_MESSAGE_POST:
1065
1066             pa_source_output_assert_io_context(u->source_output);
1067
1068             if (u->source_output->source->thread_info.state == PA_SOURCE_RUNNING)
1069                 pa_memblockq_push_align(u->sink_memblockq, chunk);
1070             else
1071                 pa_memblockq_flush_write(u->sink_memblockq, true);
1072
1073             u->recv_counter += (int64_t) chunk->length;
1074
1075             return 0;
1076
1077         case SOURCE_OUTPUT_MESSAGE_REWIND:
1078             pa_source_output_assert_io_context(u->source_output);
1079
1080             /* manipulate write index, never go past what we have */
1081             if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
1082                 pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, true);
1083             else
1084                 pa_memblockq_flush_write(u->sink_memblockq, true);
1085
1086             pa_log_debug("Sink rewind (%lld)", (long long) offset);
1087
1088             u->recv_counter -= offset;
1089
1090             return 0;
1091
1092         case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
1093             struct snapshot *snapshot = (struct snapshot *) data;
1094
1095             source_output_snapshot_within_thread(u, snapshot);
1096             return 0;
1097         }
1098
1099         case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
1100             apply_diff_time(u, offset);
1101             return 0;
1102
1103     }
1104
1105     return pa_source_output_process_msg(obj, code, data, offset, chunk);
1106 }
1107
1108 /* Called from sink I/O thread context. */
1109 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1110     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1111
1112     switch (code) {
1113
1114         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1115             size_t delay;
1116             pa_usec_t now, latency;
1117             struct snapshot *snapshot = (struct snapshot *) data;
1118
1119             pa_sink_input_assert_io_context(u->sink_input);
1120
1121             now = pa_rtclock_now();
1122             latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
1123             delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1124
1125             delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
1126
1127             snapshot->sink_now = now;
1128             snapshot->sink_latency = latency;
1129             snapshot->sink_delay = delay;
1130             snapshot->send_counter = u->send_counter;
1131             return 0;
1132         }
1133     }
1134
1135     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1136 }
1137
1138 /* Called from sink I/O thread context. */
1139 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1140     struct userdata *u;
1141
1142     pa_sink_input_assert_ref(i);
1143     pa_assert_se(u = i->userdata);
1144
1145     pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
1146
1147     /* FIXME: Too small max_rewind:
1148      * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
1149     pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
1150     pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
1151 }
1152
1153 /* Called from source I/O thread context. */
1154 static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
1155     struct userdata *u;
1156
1157     pa_source_output_assert_ref(o);
1158     pa_assert_se(u = o->userdata);
1159
1160     pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
1161
1162     pa_source_set_max_rewind_within_thread(u->source, nbytes);
1163 }
1164
1165 /* Called from sink I/O thread context. */
1166 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1167     struct userdata *u;
1168
1169     pa_sink_input_assert_ref(i);
1170     pa_assert_se(u = i->userdata);
1171
1172     pa_log_debug("Sink input update max request %lld", (long long) nbytes);
1173
1174     pa_sink_set_max_request_within_thread(u->sink, nbytes);
1175 }
1176
1177 /* Called from sink I/O thread context. */
1178 static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
1179     struct userdata *u;
1180     pa_usec_t latency;
1181
1182     pa_sink_input_assert_ref(i);
1183     pa_assert_se(u = i->userdata);
1184
1185     latency = pa_sink_get_requested_latency_within_thread(i->sink);
1186
1187     pa_log_debug("Sink input update requested latency %lld", (long long) latency);
1188 }
1189
1190 /* Called from source I/O thread context. */
1191 static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
1192     struct userdata *u;
1193     pa_usec_t latency;
1194
1195     pa_source_output_assert_ref(o);
1196     pa_assert_se(u = o->userdata);
1197
1198     latency = pa_source_get_requested_latency_within_thread(o->source);
1199
1200     pa_log_debug("Source output update requested latency %lld", (long long) latency);
1201 }
1202
1203 /* Called from sink I/O thread context. */
1204 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
1205     struct userdata *u;
1206
1207     pa_sink_input_assert_ref(i);
1208     pa_assert_se(u = i->userdata);
1209
1210     pa_log_debug("Sink input update latency range %lld %lld",
1211         (long long) i->sink->thread_info.min_latency,
1212         (long long) i->sink->thread_info.max_latency);
1213
1214     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1215 }
1216
1217 /* Called from source I/O thread context. */
1218 static void source_output_update_source_latency_range_cb(pa_source_output *o) {
1219     struct userdata *u;
1220
1221     pa_source_output_assert_ref(o);
1222     pa_assert_se(u = o->userdata);
1223
1224     pa_log_debug("Source output update latency range %lld %lld",
1225         (long long) o->source->thread_info.min_latency,
1226         (long long) o->source->thread_info.max_latency);
1227
1228     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1229 }
1230
1231 /* Called from sink I/O thread context. */
1232 static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1233     struct userdata *u;
1234
1235     pa_sink_input_assert_ref(i);
1236     pa_assert_se(u = i->userdata);
1237
1238     pa_log_debug("Sink input update fixed latency %lld",
1239         (long long) i->sink->thread_info.fixed_latency);
1240
1241     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1242 }
1243
1244 /* Called from source I/O thread context. */
1245 static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1246     struct userdata *u;
1247
1248     pa_source_output_assert_ref(o);
1249     pa_assert_se(u = o->userdata);
1250
1251     pa_log_debug("Source output update fixed latency %lld",
1252         (long long) o->source->thread_info.fixed_latency);
1253
1254     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1255 }
1256
1257 /* Called from source I/O thread context. */
1258 static void source_output_attach_cb(pa_source_output *o) {
1259     struct userdata *u;
1260
1261     pa_source_output_assert_ref(o);
1262     pa_source_output_assert_io_context(o);
1263     pa_assert_se(u = o->userdata);
1264
1265     pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1266     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1267     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1268     pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1269
1270     pa_log_debug("Source output %d attach", o->index);
1271
1272     pa_source_attach_within_thread(u->source);
1273
1274     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1275             o->source->thread_info.rtpoll,
1276             PA_RTPOLL_LATE,
1277             u->asyncmsgq);
1278 }
1279
1280 /* Called from sink I/O thread context. */
1281 static void sink_input_attach_cb(pa_sink_input *i) {
1282     struct userdata *u;
1283
1284     pa_sink_input_assert_ref(i);
1285     pa_assert_se(u = i->userdata);
1286
1287     pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1288     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1289
1290     /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1291      * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1292     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1293
1294     /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1295      * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1296      * HERE. SEE (6) */
1297     pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1298
1299     /* FIXME: Too small max_rewind:
1300      * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
1301     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1302
1303     pa_log_debug("Sink input %d attach", i->index);
1304
1305     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1306             i->sink->thread_info.rtpoll,
1307             PA_RTPOLL_LATE,
1308             u->asyncmsgq);
1309
1310     pa_sink_attach_within_thread(u->sink);
1311 }
1312
1313 /* Called from source I/O thread context. */
1314 static void source_output_detach_cb(pa_source_output *o) {
1315     struct userdata *u;
1316
1317     pa_source_output_assert_ref(o);
1318     pa_source_output_assert_io_context(o);
1319     pa_assert_se(u = o->userdata);
1320
1321     pa_source_detach_within_thread(u->source);
1322     pa_source_set_rtpoll(u->source, NULL);
1323
1324     pa_log_debug("Source output %d detach", o->index);
1325
1326     if (u->rtpoll_item_read) {
1327         pa_rtpoll_item_free(u->rtpoll_item_read);
1328         u->rtpoll_item_read = NULL;
1329     }
1330 }
1331
1332 /* Called from sink I/O thread context. */
1333 static void sink_input_detach_cb(pa_sink_input *i) {
1334     struct userdata *u;
1335
1336     pa_sink_input_assert_ref(i);
1337     pa_assert_se(u = i->userdata);
1338
1339     pa_sink_detach_within_thread(u->sink);
1340
1341     pa_sink_set_rtpoll(u->sink, NULL);
1342
1343     pa_log_debug("Sink input %d detach", i->index);
1344
1345     if (u->rtpoll_item_write) {
1346         pa_rtpoll_item_free(u->rtpoll_item_write);
1347         u->rtpoll_item_write = NULL;
1348     }
1349 }
1350
1351 /* Called from source I/O thread context. */
1352 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1353     struct userdata *u;
1354
1355     pa_source_output_assert_ref(o);
1356     pa_source_output_assert_io_context(o);
1357     pa_assert_se(u = o->userdata);
1358
1359     pa_log_debug("Source output %d state %d", o->index, state);
1360 }
1361
1362 /* Called from sink I/O thread context. */
1363 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1364     struct userdata *u;
1365
1366     pa_sink_input_assert_ref(i);
1367     pa_assert_se(u = i->userdata);
1368
1369     pa_log_debug("Sink input %d state %d", i->index, state);
1370
1371     /* If we are added for the first time, ask for a rewinding so that
1372      * we are heard right-away. */
1373     if (PA_SINK_INPUT_IS_LINKED(state) &&
1374         i->thread_info.state == PA_SINK_INPUT_INIT) {
1375         pa_log_debug("Requesting rewind due to state change.");
1376         pa_sink_input_request_rewind(i, 0, false, true, true);
1377     }
1378 }
1379
1380 /* Called from main context. */
1381 static void source_output_kill_cb(pa_source_output *o) {
1382     struct userdata *u;
1383
1384     pa_source_output_assert_ref(o);
1385     pa_assert_ctl_context();
1386     pa_assert_se(u = o->userdata);
1387
1388     u->dead = true;
1389
1390     /* The order here matters! We first kill the source output, followed
1391      * by the source. That means the source callbacks must be protected
1392      * against an unconnected source output! */
1393     pa_source_output_unlink(u->source_output);
1394     pa_source_unlink(u->source);
1395
1396     pa_source_output_unref(u->source_output);
1397     u->source_output = NULL;
1398
1399     pa_source_unref(u->source);
1400     u->source = NULL;
1401
1402     pa_log_debug("Source output kill %d", o->index);
1403
1404     pa_module_unload_request(u->module, true);
1405 }
1406
1407 /* Called from main context */
1408 static void sink_input_kill_cb(pa_sink_input *i) {
1409     struct userdata *u;
1410
1411     pa_sink_input_assert_ref(i);
1412     pa_assert_se(u = i->userdata);
1413
1414     u->dead = true;
1415
1416     /* The order here matters! We first kill the sink input, followed
1417      * by the sink. That means the sink callbacks must be protected
1418      * against an unconnected sink input! */
1419     pa_sink_input_unlink(u->sink_input);
1420     pa_sink_unlink(u->sink);
1421
1422     pa_sink_input_unref(u->sink_input);
1423     u->sink_input = NULL;
1424
1425     pa_sink_unref(u->sink);
1426     u->sink = NULL;
1427
1428     pa_log_debug("Sink input kill %d", i->index);
1429
1430     pa_module_unload_request(u->module, true);
1431 }
1432
1433 /* Called from main context. */
1434 static bool source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1435     struct userdata *u;
1436
1437     pa_source_output_assert_ref(o);
1438     pa_assert_ctl_context();
1439     pa_assert_se(u = o->userdata);
1440
1441     if (u->dead || u->autoloaded)
1442         return false;
1443
1444     return (u->source != dest) && (u->sink != dest->monitor_of);
1445 }
1446
1447 /* Called from main context */
1448 static bool sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1449     struct userdata *u;
1450
1451     pa_sink_input_assert_ref(i);
1452     pa_assert_se(u = i->userdata);
1453
1454     if (u->dead || u->autoloaded)
1455         return false;
1456
1457     return u->sink != dest;
1458 }
1459
1460 /* Called from main context. */
1461 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1462     struct userdata *u;
1463
1464     pa_source_output_assert_ref(o);
1465     pa_assert_ctl_context();
1466     pa_assert_se(u = o->userdata);
1467
1468     if (dest) {
1469         pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1470         pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1471     } else
1472         pa_source_set_asyncmsgq(u->source, NULL);
1473
1474     if (u->source_auto_desc && dest) {
1475         const char *y, *z;
1476         pa_proplist *pl;
1477
1478         pl = pa_proplist_new();
1479         y = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION);
1480         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1481         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1482                 y ? y : u->sink_input->sink->name);
1483
1484         pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1485         pa_proplist_free(pl);
1486     }
1487 }
1488
1489 /* Called from main context */
1490 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1491     struct userdata *u;
1492
1493     pa_sink_input_assert_ref(i);
1494     pa_assert_se(u = i->userdata);
1495
1496     if (dest) {
1497         pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1498         pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1499     } else
1500         pa_sink_set_asyncmsgq(u->sink, NULL);
1501
1502     if (u->sink_auto_desc && dest) {
1503         const char *y, *z;
1504         pa_proplist *pl;
1505
1506         pl = pa_proplist_new();
1507         y = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION);
1508         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1509         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1510                          y ? y : u->source_output->source->name);
1511
1512         pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1513         pa_proplist_free(pl);
1514     }
1515 }
1516
1517 /* Called from main context */
1518 static void sink_input_volume_changed_cb(pa_sink_input *i) {
1519     struct userdata *u;
1520
1521     pa_sink_input_assert_ref(i);
1522     pa_assert_se(u = i->userdata);
1523
1524     pa_sink_volume_changed(u->sink, &i->volume);
1525 }
1526
1527 /* Called from main context */
1528 static void sink_input_mute_changed_cb(pa_sink_input *i) {
1529     struct userdata *u;
1530
1531     pa_sink_input_assert_ref(i);
1532     pa_assert_se(u = i->userdata);
1533
1534     pa_sink_mute_changed(u->sink, i->muted);
1535 }
1536
1537 /* Called from main context */
1538 static int canceller_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1539     struct pa_echo_canceller_msg *msg;
1540     struct userdata *u;
1541
1542     pa_assert(o);
1543
1544     msg = PA_ECHO_CANCELLER_MSG(o);
1545     u = msg->userdata;
1546
1547     switch (code) {
1548         case ECHO_CANCELLER_MESSAGE_SET_VOLUME: {
1549             pa_cvolume *v = (pa_cvolume *) userdata;
1550
1551             if (u->use_volume_sharing)
1552                 pa_source_set_volume(u->source, v, true, false);
1553             else
1554                 pa_source_output_set_volume(u->source_output, v, false, true);
1555
1556             break;
1557         }
1558
1559         default:
1560             pa_assert_not_reached();
1561             break;
1562     }
1563
1564     return 0;
1565 }
1566
1567 /* Called by the canceller, so source I/O thread context. */
1568 void pa_echo_canceller_get_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1569     *v = ec->msg->userdata->thread_info.current_volume;
1570 }
1571
1572 /* Called by the canceller, so source I/O thread context. */
1573 void pa_echo_canceller_set_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1574     if (!pa_cvolume_equal(&ec->msg->userdata->thread_info.current_volume, v)) {
1575         pa_cvolume *vol = pa_xnewdup(pa_cvolume, v, 1);
1576
1577         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(ec->msg), ECHO_CANCELLER_MESSAGE_SET_VOLUME, vol, 0, NULL,
1578                 pa_xfree);
1579     }
1580 }
1581
1582 uint32_t pa_echo_canceller_blocksize_power2(unsigned rate, unsigned ms) {
1583     unsigned nframes = (rate * ms) / 1000;
1584     uint32_t y = 1 << ((8 * sizeof(uint32_t)) - 2);
1585
1586     assert(rate >= 4000);
1587     assert(ms >= 1);
1588
1589     /* nframes should be a power of 2, round down to nearest power of two */
1590     while (y > nframes)
1591         y >>= 1;
1592
1593     assert(y >= 1);
1594     return y;
1595 }
1596
1597 static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1598     if (pa_streq(method, "null"))
1599         return PA_ECHO_CANCELLER_NULL;
1600 #ifdef HAVE_SPEEX
1601     if (pa_streq(method, "speex"))
1602         return PA_ECHO_CANCELLER_SPEEX;
1603 #endif
1604 #ifdef HAVE_ADRIAN_EC
1605     if (pa_streq(method, "adrian"))
1606         return PA_ECHO_CANCELLER_ADRIAN;
1607 #endif
1608 #ifdef HAVE_WEBRTC
1609     if (pa_streq(method, "webrtc"))
1610         return PA_ECHO_CANCELLER_WEBRTC;
1611 #endif
1612     return PA_ECHO_CANCELLER_INVALID;
1613 }
1614
1615 static int extension_cb(pa_native_protocol *p, pa_module *m, pa_native_connection *c, uint32_t tag, pa_tagstruct *t) {
1616         uint32_t command;
1617         uint32_t value;
1618         struct userdata *u = NULL;
1619         pa_tagstruct *reply = NULL;
1620         pa_assert(p);
1621         pa_assert(m);
1622         pa_assert(c);
1623         pa_assert(t);
1624
1625         u = m->userdata;
1626
1627         if (pa_tagstruct_getu32(t, &command) < 0)
1628         goto fail;
1629
1630         reply = pa_tagstruct_new(NULL, 0);
1631         pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
1632         pa_tagstruct_putu32(reply, tag);
1633
1634         switch (command) {
1635                 case AEC_SET_VOLUME: {
1636                         pa_tagstruct_getu32(t,&value);
1637                         pa_log_debug("AEC_SET_VOLUME in echo cancel = %d",value);
1638                 break;
1639         }
1640                 case AEC_SET_DEVICE: {
1641                         pa_tagstruct_getu32(t,&value);
1642                         pa_log_debug("AEC_SET_DEVICE in echo cancel = %d",value);
1643                 break;
1644         }
1645         default:
1646                 goto fail;
1647         }
1648         pa_pstream_send_tagstruct(pa_native_connection_get_pstream(c), reply);
1649         return 0;
1650
1651 fail:
1652         return -1;
1653 }
1654
1655 /* Common initialisation bits between module-echo-cancel and the standalone
1656  * test program.
1657  *
1658  * Called from main context. */
1659 static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *source_ss, pa_channel_map *source_map) {
1660     const char *ec_string;
1661     pa_echo_canceller_method_t ec_method;
1662
1663     if (pa_modargs_get_sample_spec_and_channel_map(ma, source_ss, source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1664         pa_log("Invalid sample format specification or channel map");
1665         goto fail;
1666     }
1667
1668     u->ec = pa_xnew0(pa_echo_canceller, 1);
1669     if (!u->ec) {
1670         pa_log("Failed to alloc echo canceller");
1671         goto fail;
1672     }
1673
1674     ec_string = pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER);
1675     if ((ec_method = get_ec_method_from_string(ec_string)) < 0) {
1676         pa_log("Invalid echo canceller implementation '%s'", ec_string);
1677         goto fail;
1678     }
1679
1680     pa_log_info("Using AEC engine: %s", ec_string);
1681
1682     u->ec->init = ec_table[ec_method].init;
1683     u->ec->play = ec_table[ec_method].play;
1684     u->ec->record = ec_table[ec_method].record;
1685     u->ec->set_drift = ec_table[ec_method].set_drift;
1686     u->ec->run = ec_table[ec_method].run;
1687     u->ec->done = ec_table[ec_method].done;
1688
1689     return 0;
1690
1691 fail:
1692     return -1;
1693 }
1694
1695 /* Called from main context. */
1696 int pa__init(pa_module*m) {
1697     struct userdata *u;
1698     pa_sample_spec source_output_ss, source_ss, sink_ss;
1699     pa_channel_map source_output_map, source_map, sink_map;
1700     pa_modargs *ma;
1701     pa_source *source_master=NULL;
1702     pa_sink *sink_master=NULL;
1703     pa_source_output_new_data source_output_data;
1704     pa_sink_input_new_data sink_input_data;
1705     pa_source_new_data source_data;
1706     pa_sink_new_data sink_data;
1707     pa_memchunk silence;
1708     uint32_t temp;
1709     uint32_t nframes = 0;
1710
1711     pa_assert(m);
1712
1713     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1714         pa_log("Failed to parse module arguments.");
1715         goto fail;
1716     }
1717
1718     if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1719         pa_log("Master source not found");
1720         goto fail;
1721     }
1722     pa_assert(source_master);
1723
1724     if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1725         pa_log("Master sink not found");
1726         goto fail;
1727     }
1728     pa_assert(sink_master);
1729
1730     if (source_master->monitor_of == sink_master) {
1731         pa_log("Can't cancel echo between a sink and its monitor");
1732         goto fail;
1733     }
1734
1735     source_ss = source_master->sample_spec;
1736     source_ss.rate = DEFAULT_RATE;
1737     source_ss.channels = DEFAULT_CHANNELS;
1738     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1739
1740     sink_ss = sink_master->sample_spec;
1741     sink_map = sink_master->channel_map;
1742
1743     u = pa_xnew0(struct userdata, 1);
1744     if (!u) {
1745         pa_log("Failed to alloc userdata");
1746         goto fail;
1747     }
1748     u->core = m->core;
1749     u->module = m;
1750     m->userdata = u;
1751     u->dead = false;
1752
1753     u->use_volume_sharing = true;
1754     if (pa_modargs_get_value_boolean(ma, "use_volume_sharing", &u->use_volume_sharing) < 0) {
1755         pa_log("use_volume_sharing= expects a boolean argument");
1756         goto fail;
1757     }
1758
1759     temp = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1760     if (pa_modargs_get_value_u32(ma, "adjust_time", &temp) < 0) {
1761         pa_log("Failed to parse adjust_time value");
1762         goto fail;
1763     }
1764
1765     if (temp != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1766         u->adjust_time = temp * PA_USEC_PER_SEC;
1767     else
1768         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1769
1770     temp = DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC;
1771     if (pa_modargs_get_value_u32(ma, "adjust_threshold", &temp) < 0) {
1772         pa_log("Failed to parse adjust_threshold value");
1773         goto fail;
1774     }
1775
1776     if (temp != DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC)
1777         u->adjust_threshold = temp * PA_USEC_PER_MSEC;
1778     else
1779         u->adjust_threshold = DEFAULT_ADJUST_TOLERANCE;
1780
1781     u->save_aec = DEFAULT_SAVE_AEC;
1782     if (pa_modargs_get_value_boolean(ma, "save_aec", &u->save_aec) < 0) {
1783         pa_log("Failed to parse save_aec value");
1784         goto fail;
1785     }
1786
1787     u->autoloaded = DEFAULT_AUTOLOADED;
1788     if (pa_modargs_get_value_boolean(ma, "autoloaded", &u->autoloaded) < 0) {
1789         pa_log("Failed to parse autoloaded value");
1790         goto fail;
1791     }
1792
1793     if (init_common(ma, u, &source_ss, &source_map) < 0)
1794         goto fail;
1795
1796     u->asyncmsgq = pa_asyncmsgq_new(0);
1797     u->need_realign = true;
1798
1799     source_output_ss = source_ss;
1800     source_output_map = source_map;
1801
1802     if (sink_ss.rate != source_ss.rate) {
1803         pa_log_info("Sample rates of play and out stream differ. Adjusting rate of play stream.");
1804         sink_ss.rate = source_ss.rate;
1805     }
1806
1807     pa_assert(u->ec->init);
1808     if (!u->ec->init(u->core, u->ec, &source_output_ss, &source_output_map, &sink_ss, &sink_map, &source_ss, &source_map, &nframes, pa_modargs_get_value(ma, "aec_args", NULL))) {
1809         pa_log("Failed to init AEC engine");
1810         goto fail;
1811     }
1812
1813     pa_assert(source_output_ss.rate == source_ss.rate);
1814     pa_assert(sink_ss.rate == source_ss.rate);
1815
1816     u->source_output_blocksize = nframes * pa_frame_size(&source_output_ss);
1817     u->source_blocksize = nframes * pa_frame_size(&source_ss);
1818     u->sink_blocksize = nframes * pa_frame_size(&sink_ss);
1819
1820     if (u->ec->params.drift_compensation)
1821         pa_assert(u->ec->set_drift);
1822
1823     /* Create source */
1824     pa_source_new_data_init(&source_data);
1825     source_data.driver = __FILE__;
1826     source_data.module = m;
1827     if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1828         source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1829     pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1830     pa_source_new_data_set_channel_map(&source_data, &source_map);
1831     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1832     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1833     if (!u->autoloaded)
1834         pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1835
1836     if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1837         pa_log("Invalid properties");
1838         pa_source_new_data_done(&source_data);
1839         goto fail;
1840     }
1841
1842     if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1843         const char *y, *z;
1844
1845         y = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1846         z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1847         pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1848                 z ? z : source_master->name, y ? y : sink_master->name);
1849     }
1850
1851     u->source = pa_source_new(m->core, &source_data, (source_master->flags & (PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY))
1852                                                      | (u->use_volume_sharing ? PA_SOURCE_SHARE_VOLUME_WITH_MASTER : 0));
1853     pa_source_new_data_done(&source_data);
1854
1855     if (!u->source) {
1856         pa_log("Failed to create source.");
1857         goto fail;
1858     }
1859
1860     u->source->parent.process_msg = source_process_msg_cb;
1861     u->source->set_state = source_set_state_cb;
1862     u->source->update_requested_latency = source_update_requested_latency_cb;
1863     pa_source_set_get_mute_callback(u->source, source_get_mute_cb);
1864     pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
1865     if (!u->use_volume_sharing) {
1866         pa_source_set_get_volume_callback(u->source, source_get_volume_cb);
1867         pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
1868         pa_source_enable_decibel_volume(u->source, true);
1869     }
1870     u->source->userdata = u;
1871
1872     pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1873
1874     /* Create sink */
1875     pa_sink_new_data_init(&sink_data);
1876     sink_data.driver = __FILE__;
1877     sink_data.module = m;
1878     if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1879         sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1880     pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1881     pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1882     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1883     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1884     if (!u->autoloaded)
1885         pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1886
1887     if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1888         pa_log("Invalid properties");
1889         pa_sink_new_data_done(&sink_data);
1890         goto fail;
1891     }
1892
1893     if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1894         const char *y, *z;
1895
1896         y = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1897         z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1898         pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1899                 z ? z : sink_master->name, y ? y : source_master->name);
1900     }
1901
1902     u->sink = pa_sink_new(m->core, &sink_data, (sink_master->flags & (PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY))
1903                                                | (u->use_volume_sharing ? PA_SINK_SHARE_VOLUME_WITH_MASTER : 0));
1904     pa_sink_new_data_done(&sink_data);
1905
1906     if (!u->sink) {
1907         pa_log("Failed to create sink.");
1908         goto fail;
1909     }
1910
1911     u->sink->parent.process_msg = sink_process_msg_cb;
1912     u->sink->set_state = sink_set_state_cb;
1913     u->sink->update_requested_latency = sink_update_requested_latency_cb;
1914     u->sink->request_rewind = sink_request_rewind_cb;
1915     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
1916     if (!u->use_volume_sharing) {
1917         pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
1918         pa_sink_enable_decibel_volume(u->sink, true);
1919     }
1920     u->sink->userdata = u;
1921
1922     pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1923
1924     /* Create source output */
1925     pa_source_output_new_data_init(&source_output_data);
1926     source_output_data.driver = __FILE__;
1927     source_output_data.module = m;
1928     pa_source_output_new_data_set_source(&source_output_data, source_master, false);
1929     source_output_data.destination_source = u->source;
1930
1931     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1932     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1933     pa_source_output_new_data_set_sample_spec(&source_output_data, &source_output_ss);
1934     pa_source_output_new_data_set_channel_map(&source_output_data, &source_output_map);
1935
1936     pa_source_output_new(&u->source_output, m->core, &source_output_data);
1937     pa_source_output_new_data_done(&source_output_data);
1938
1939     if (!u->source_output)
1940         goto fail;
1941
1942     u->source_output->parent.process_msg = source_output_process_msg_cb;
1943     u->source_output->push = source_output_push_cb;
1944     u->source_output->process_rewind = source_output_process_rewind_cb;
1945     u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1946     u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1947     u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1948     u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1949     u->source_output->kill = source_output_kill_cb;
1950     u->source_output->attach = source_output_attach_cb;
1951     u->source_output->detach = source_output_detach_cb;
1952     u->source_output->state_change = source_output_state_change_cb;
1953     u->source_output->may_move_to = source_output_may_move_to_cb;
1954     u->source_output->moving = source_output_moving_cb;
1955     u->source_output->userdata = u;
1956
1957     u->source->output_from_master = u->source_output;
1958
1959     /* Create sink input */
1960     pa_sink_input_new_data_init(&sink_input_data);
1961     sink_input_data.driver = __FILE__;
1962     sink_input_data.module = m;
1963     pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, false);
1964     sink_input_data.origin_sink = u->sink;
1965     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
1966     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1967     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
1968     pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
1969     sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
1970
1971     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1972     pa_sink_input_new_data_done(&sink_input_data);
1973
1974     if (!u->sink_input)
1975         goto fail;
1976
1977     u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1978     u->sink_input->pop = sink_input_pop_cb;
1979     u->sink_input->process_rewind = sink_input_process_rewind_cb;
1980     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1981     u->sink_input->update_max_request = sink_input_update_max_request_cb;
1982     u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
1983     u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1984     u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
1985     u->sink_input->kill = sink_input_kill_cb;
1986     u->sink_input->attach = sink_input_attach_cb;
1987     u->sink_input->detach = sink_input_detach_cb;
1988     u->sink_input->state_change = sink_input_state_change_cb;
1989     u->sink_input->may_move_to = sink_input_may_move_to_cb;
1990     u->sink_input->moving = sink_input_moving_cb;
1991     if (!u->use_volume_sharing)
1992         u->sink_input->volume_changed = sink_input_volume_changed_cb;
1993     u->sink_input->mute_changed = sink_input_mute_changed_cb;
1994     u->sink_input->userdata = u;
1995
1996     u->sink->input_to_master = u->sink_input;
1997
1998     pa_sink_input_get_silence(u->sink_input, &silence);
1999
2000     u->source_memblockq = pa_memblockq_new("module-echo-cancel source_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
2001         &source_output_ss, 1, 1, 0, &silence);
2002     u->sink_memblockq = pa_memblockq_new("module-echo-cancel sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
2003         &sink_ss, 0, 1, 0, &silence);
2004
2005     pa_memblock_unref(silence.memblock);
2006
2007     if (!u->source_memblockq || !u->sink_memblockq) {
2008         pa_log("Failed to create memblockq.");
2009         goto fail;
2010     }
2011
2012     if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
2013         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
2014     else if (u->ec->params.drift_compensation) {
2015         pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
2016         u->adjust_time = 0;
2017         /* Perform resync just once to give the canceller a leg up */
2018         pa_atomic_store(&u->request_resync, 1);
2019     }
2020
2021     if (u->save_aec) {
2022         pa_log("Creating AEC files in /tmp");
2023         u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
2024         if (u->captured_file == NULL)
2025             perror ("fopen failed");
2026         u->played_file = fopen("/tmp/aec_play.sw", "wb");
2027         if (u->played_file == NULL)
2028             perror ("fopen failed");
2029         u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
2030         if (u->canceled_file == NULL)
2031             perror ("fopen failed");
2032         if (u->ec->params.drift_compensation) {
2033             u->drift_file = fopen("/tmp/aec_drift.txt", "w");
2034             if (u->drift_file == NULL)
2035                 perror ("fopen failed");
2036         }
2037     }
2038
2039     u->ec->msg = pa_msgobject_new(pa_echo_canceller_msg);
2040     u->ec->msg->parent.process_msg = canceller_process_msg_cb;
2041     u->ec->msg->userdata = u;
2042
2043     u->thread_info.current_volume = u->source->reference_volume;
2044
2045     u->protocol = pa_native_protocol_get(m->core);
2046     pa_native_protocol_install_ext(u->protocol, m, extension_cb);
2047
2048     pa_sink_put(u->sink);
2049     pa_source_put(u->source);
2050
2051     pa_sink_input_put(u->sink_input);
2052     pa_source_output_put(u->source_output);
2053     pa_modargs_free(ma);
2054
2055     return 0;
2056
2057 fail:
2058     if (ma)
2059         pa_modargs_free(ma);
2060
2061     pa__done(m);
2062
2063     return -1;
2064 }
2065
2066 /* Called from main context. */
2067 int pa__get_n_used(pa_module *m) {
2068     struct userdata *u;
2069
2070     pa_assert(m);
2071     pa_assert_se(u = m->userdata);
2072
2073     return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
2074 }
2075
2076 /* Called from main context. */
2077 void pa__done(pa_module*m) {
2078     struct userdata *u;
2079
2080     pa_assert(m);
2081
2082     if (!(u = m->userdata))
2083         return;
2084
2085     u->dead = true;
2086
2087     /* See comments in source_output_kill_cb() above regarding
2088      * destruction order! */
2089
2090     if (u->time_event)
2091         u->core->mainloop->time_free(u->time_event);
2092
2093     if (u->source_output)
2094         pa_source_output_unlink(u->source_output);
2095     if (u->sink_input)
2096         pa_sink_input_unlink(u->sink_input);
2097
2098     if (u->source)
2099         pa_source_unlink(u->source);
2100     if (u->sink)
2101         pa_sink_unlink(u->sink);
2102
2103     if (u->source_output)
2104         pa_source_output_unref(u->source_output);
2105     if (u->sink_input)
2106         pa_sink_input_unref(u->sink_input);
2107
2108     if (u->source)
2109         pa_source_unref(u->source);
2110     if (u->sink)
2111         pa_sink_unref(u->sink);
2112
2113     if (u->source_memblockq)
2114         pa_memblockq_free(u->source_memblockq);
2115     if (u->sink_memblockq)
2116         pa_memblockq_free(u->sink_memblockq);
2117
2118     if (u->ec) {
2119         if (u->ec->done)
2120             u->ec->done(u->ec);
2121
2122         pa_xfree(u->ec);
2123     }
2124
2125     if (u->protocol) {
2126         pa_native_protocol_remove_ext(u->protocol, m);
2127         pa_native_protocol_unref(u->protocol);
2128     }
2129
2130     if (u->asyncmsgq)
2131         pa_asyncmsgq_unref(u->asyncmsgq);
2132
2133     if (u->save_aec) {
2134         if (u->played_file)
2135             fclose(u->played_file);
2136         if (u->captured_file)
2137             fclose(u->captured_file);
2138         if (u->canceled_file)
2139             fclose(u->canceled_file);
2140         if (u->drift_file)
2141             fclose(u->drift_file);
2142     }
2143
2144     pa_xfree(u);
2145 }
2146
2147 #ifdef ECHO_CANCEL_TEST
2148 /*
2149  * Stand-alone test program for running in the canceller on pre-recorded files.
2150  */
2151 int main(int argc, char* argv[]) {
2152     struct userdata u;
2153     pa_sample_spec source_output_ss, source_ss, sink_ss;
2154     pa_channel_map source_output_map, source_map, sink_map;
2155     pa_modargs *ma = NULL;
2156     uint8_t *rdata = NULL, *pdata = NULL, *cdata = NULL;
2157     int unused PA_GCC_UNUSED;
2158     int ret = 0, i;
2159     char c;
2160     float drift;
2161     uint32_t nframes;
2162
2163     if (!getenv("MAKE_CHECK"))
2164         pa_log_set_level(PA_LOG_DEBUG);
2165
2166     pa_memzero(&u, sizeof(u));
2167
2168     if (argc < 4 || argc > 7) {
2169         goto usage;
2170     }
2171
2172     u.captured_file = fopen(argv[2], "rb");
2173     if (u.captured_file == NULL) {
2174         perror ("Could not open capture file");
2175         goto fail;
2176     }
2177     u.played_file = fopen(argv[1], "rb");
2178     if (u.played_file == NULL) {
2179         perror ("Could not open play file");
2180         goto fail;
2181     }
2182     u.canceled_file = fopen(argv[3], "wb");
2183     if (u.canceled_file == NULL) {
2184         perror ("Could not open canceled file");
2185         goto fail;
2186     }
2187
2188     u.core = pa_xnew0(pa_core, 1);
2189     u.core->cpu_info.cpu_type = PA_CPU_X86;
2190     u.core->cpu_info.flags.x86 |= PA_CPU_X86_SSE;
2191
2192     if (!(ma = pa_modargs_new(argc > 4 ? argv[4] : NULL, valid_modargs))) {
2193         pa_log("Failed to parse module arguments.");
2194         goto fail;
2195     }
2196
2197     source_ss.format = PA_SAMPLE_S16LE;
2198     source_ss.rate = DEFAULT_RATE;
2199     source_ss.channels = DEFAULT_CHANNELS;
2200     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2201
2202     sink_ss.format = PA_SAMPLE_S16LE;
2203     sink_ss.rate = DEFAULT_RATE;
2204     sink_ss.channels = DEFAULT_CHANNELS;
2205     pa_channel_map_init_auto(&sink_map, sink_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2206
2207     if (init_common(ma, &u, &source_ss, &source_map) < 0)
2208         goto fail;
2209
2210     source_output_ss = source_ss;
2211     source_output_map = source_map;
2212
2213     if (!u.ec->init(u.core, u.ec, &source_output_ss, &source_output_map, &sink_ss, &sink_map, &source_ss, &source_map, &nframes,
2214                      pa_modargs_get_value(ma, "aec_args", NULL))) {
2215         pa_log("Failed to init AEC engine");
2216         goto fail;
2217     }
2218     u.source_output_blocksize = nframes * pa_frame_size(&source_output_ss);
2219     u.source_blocksize = nframes * pa_frame_size(&source_ss);
2220     u.sink_blocksize = nframes * pa_frame_size(&sink_ss);
2221
2222     if (u.ec->params.drift_compensation) {
2223         if (argc < 6) {
2224             pa_log("Drift compensation enabled but drift file not specified");
2225             goto fail;
2226         }
2227
2228         u.drift_file = fopen(argv[5], "rt");
2229
2230         if (u.drift_file == NULL) {
2231             perror ("Could not open drift file");
2232             goto fail;
2233         }
2234     }
2235
2236     rdata = pa_xmalloc(u.source_output_blocksize);
2237     pdata = pa_xmalloc(u.sink_blocksize);
2238     cdata = pa_xmalloc(u.source_blocksize);
2239
2240     if (!u.ec->params.drift_compensation) {
2241         while (fread(rdata, u.source_output_blocksize, 1, u.captured_file) > 0) {
2242             if (fread(pdata, u.sink_blocksize, 1, u.played_file) == 0) {
2243                 perror("Played file ended before captured file");
2244                 goto fail;
2245             }
2246
2247             u.ec->run(u.ec, rdata, pdata, cdata);
2248
2249             unused = fwrite(cdata, u.source_blocksize, 1, u.canceled_file);
2250         }
2251     } else {
2252         while (fscanf(u.drift_file, "%c", &c) > 0) {
2253             switch (c) {
2254                 case 'd':
2255                     if (!fscanf(u.drift_file, "%a", &drift)) {
2256                         perror("Drift file incomplete");
2257                         goto fail;
2258                     }
2259
2260                     u.ec->set_drift(u.ec, drift);
2261
2262                     break;
2263
2264                 case 'c':
2265                     if (!fscanf(u.drift_file, "%d", &i)) {
2266                         perror("Drift file incomplete");
2267                         goto fail;
2268                     }
2269
2270                     if (fread(rdata, i, 1, u.captured_file) <= 0) {
2271                         perror("Captured file ended prematurely");
2272                         goto fail;
2273                     }
2274
2275                     u.ec->record(u.ec, rdata, cdata);
2276
2277                     unused = fwrite(cdata, i, 1, u.canceled_file);
2278
2279                     break;
2280
2281                 case 'p':
2282                     if (!fscanf(u.drift_file, "%d", &i)) {
2283                         perror("Drift file incomplete");
2284                         goto fail;
2285                     }
2286
2287                     if (fread(pdata, i, 1, u.played_file) <= 0) {
2288                         perror("Played file ended prematurely");
2289                         goto fail;
2290                     }
2291
2292                     u.ec->play(u.ec, pdata);
2293
2294                     break;
2295             }
2296         }
2297
2298         if (fread(rdata, i, 1, u.captured_file) > 0)
2299             pa_log("All capture data was not consumed");
2300         if (fread(pdata, i, 1, u.played_file) > 0)
2301             pa_log("All playback data was not consumed");
2302     }
2303
2304     u.ec->done(u.ec);
2305
2306 out:
2307     if (u.captured_file)
2308         fclose(u.captured_file);
2309     if (u.played_file)
2310         fclose(u.played_file);
2311     if (u.canceled_file)
2312         fclose(u.canceled_file);
2313     if (u.drift_file)
2314         fclose(u.drift_file);
2315
2316     pa_xfree(rdata);
2317     pa_xfree(pdata);
2318     pa_xfree(cdata);
2319
2320     pa_xfree(u.ec);
2321     pa_xfree(u.core);
2322
2323     if (ma)
2324         pa_modargs_free(ma);
2325
2326     return ret;
2327
2328 usage:
2329     pa_log("Usage: %s play_file rec_file out_file [module args] [drift_file]", argv[0]);
2330
2331 fail:
2332     ret = -1;
2333     goto out;
2334 }
2335 #endif /* ECHO_CANCEL_TEST */