echo-cancel: Add function pa_echo_canceller_blocksize_power2()
[platform/upstream/pulseaudio.git] / src / modules / echo-cancel / module-echo-cancel.c
1 /***
2     This file is part of PulseAudio.
3
4     Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6     Based on module-virtual-sink.c
7              module-virtual-source.c
8              module-loopback.c
9
10         Copyright 2010 Intel Corporation
11         Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13     PulseAudio is free software; you can redistribute it and/or modify
14     it under the terms of the GNU Lesser General Public License as published
15     by the Free Software Foundation; either version 2.1 of the License,
16     or (at your option) any later version.
17
18     PulseAudio is distributed in the hope that it will be useful, but
19     WITHOUT ANY WARRANTY; without even the implied warranty of
20     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21     General Public License for more details.
22
23     You should have received a copy of the GNU Lesser General Public License
24     along with PulseAudio; if not, write to the Free Software
25     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26     USA.
27 ***/
28
29 #ifdef HAVE_CONFIG_H
30 #include <config.h>
31 #endif
32
33 #include <stdio.h>
34 #include <math.h>
35
36 #include "echo-cancel.h"
37
38 #include <pulse/xmalloc.h>
39 #include <pulse/timeval.h>
40 #include <pulse/rtclock.h>
41
42 #include <pulsecore/i18n.h>
43 #include <pulsecore/atomic.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/namereg.h>
46 #include <pulsecore/sink.h>
47 #include <pulsecore/module.h>
48 #include <pulsecore/core-rtclock.h>
49 #include <pulsecore/core-util.h>
50 #include <pulsecore/modargs.h>
51 #include <pulsecore/log.h>
52 #include <pulsecore/rtpoll.h>
53 #include <pulsecore/sample-util.h>
54 #include <pulsecore/ltdl-helper.h>
55
56 #include "module-echo-cancel-symdef.h"
57
58 PA_MODULE_AUTHOR("Wim Taymans");
59 PA_MODULE_DESCRIPTION("Echo Cancellation");
60 PA_MODULE_VERSION(PACKAGE_VERSION);
61 PA_MODULE_LOAD_ONCE(FALSE);
62 PA_MODULE_USAGE(
63         _("source_name=<name for the source> "
64           "source_properties=<properties for the source> "
65           "source_master=<name of source to filter> "
66           "sink_name=<name for the sink> "
67           "sink_properties=<properties for the sink> "
68           "sink_master=<name of sink to filter> "
69           "adjust_time=<how often to readjust rates in s> "
70           "adjust_threshold=<how much drift to readjust after in ms> "
71           "format=<sample format> "
72           "rate=<sample rate> "
73           "channels=<number of channels> "
74           "channel_map=<channel map> "
75           "aec_method=<implementation to use> "
76           "aec_args=<parameters for the AEC engine> "
77           "save_aec=<save AEC data in /tmp> "
78           "autoloaded=<set if this module is being loaded automatically> "
79           "use_volume_sharing=<yes or no> "
80         ));
81
82 /* NOTE: Make sure the enum and ec_table are maintained in the correct order */
83 typedef enum {
84     PA_ECHO_CANCELLER_INVALID = -1,
85     PA_ECHO_CANCELLER_NULL,
86 #ifdef HAVE_SPEEX
87     PA_ECHO_CANCELLER_SPEEX,
88 #endif
89 #ifdef HAVE_ADRIAN_EC
90     PA_ECHO_CANCELLER_ADRIAN,
91 #endif
92 #ifdef HAVE_WEBRTC
93     PA_ECHO_CANCELLER_WEBRTC,
94 #endif
95 } pa_echo_canceller_method_t;
96
97 #ifdef HAVE_WEBRTC
98 #define DEFAULT_ECHO_CANCELLER "webrtc"
99 #else
100 #define DEFAULT_ECHO_CANCELLER "speex"
101 #endif
102
103 static const pa_echo_canceller ec_table[] = {
104     {
105         /* Null, Dummy echo canceller (just copies data) */
106         .init                   = pa_null_ec_init,
107         .run                    = pa_null_ec_run,
108         .done                   = pa_null_ec_done,
109     },
110 #ifdef HAVE_SPEEX
111     {
112         /* Speex */
113         .init                   = pa_speex_ec_init,
114         .run                    = pa_speex_ec_run,
115         .done                   = pa_speex_ec_done,
116     },
117 #endif
118 #ifdef HAVE_ADRIAN_EC
119     {
120         /* Adrian Andre's NLMS implementation */
121         .init                   = pa_adrian_ec_init,
122         .run                    = pa_adrian_ec_run,
123         .done                   = pa_adrian_ec_done,
124     },
125 #endif
126 #ifdef HAVE_WEBRTC
127     {
128         /* WebRTC's audio processing engine */
129         .init                   = pa_webrtc_ec_init,
130         .play                   = pa_webrtc_ec_play,
131         .record                 = pa_webrtc_ec_record,
132         .set_drift              = pa_webrtc_ec_set_drift,
133         .run                    = pa_webrtc_ec_run,
134         .done                   = pa_webrtc_ec_done,
135     },
136 #endif
137 };
138
139 #define DEFAULT_RATE 32000
140 #define DEFAULT_CHANNELS 1
141 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
142 #define DEFAULT_ADJUST_TOLERANCE (5*PA_USEC_PER_MSEC)
143 #define DEFAULT_SAVE_AEC FALSE
144 #define DEFAULT_AUTOLOADED FALSE
145
146 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
147
148 /* Can only be used in main context */
149 #define IS_ACTIVE(u) ((pa_source_get_state((u)->source) == PA_SOURCE_RUNNING) && \
150                       (pa_sink_get_state((u)->sink) == PA_SINK_RUNNING))
151
152 /* This module creates a new (virtual) source and sink.
153  *
154  * The data sent to the new sink is kept in a memblockq before being
155  * forwarded to the real sink_master.
156  *
157  * Data read from source_master is matched against the saved sink data and
158  * echo canceled data is then pushed onto the new source.
159  *
160  * Both source and sink masters have their own threads to push/pull data
161  * respectively. We however perform all our actions in the source IO thread.
162  * To do this we send all played samples to the source IO thread where they
163  * are then pushed into the memblockq.
164  *
165  * Alignment is performed in two steps:
166  *
167  * 1) when something happens that requires quick adjustment of the alignment of
168  *    capture and playback samples, we perform a resync. This adjusts the
169  *    position in the playback memblock to the requested sample. Quick
170  *    adjustments include moving the playback samples before the capture
171  *    samples (because else the echo canceler does not work) or when the
172  *    playback pointer drifts too far away.
173  *
174  * 2) periodically check the difference between capture and playback. We use a
175  *    low and high watermark for adjusting the alignment. Playback should always
176  *    be before capture and the difference should not be bigger than one frame
177  *    size. We would ideally like to resample the sink_input but most driver
178  *    don't give enough accuracy to be able to do that right now.
179  */
180
181 struct userdata;
182
183 struct pa_echo_canceller_msg {
184     pa_msgobject parent;
185     struct userdata *userdata;
186 };
187
188 PA_DEFINE_PRIVATE_CLASS(pa_echo_canceller_msg, pa_msgobject);
189 #define PA_ECHO_CANCELLER_MSG(o) (pa_echo_canceller_msg_cast(o))
190
191 struct snapshot {
192     pa_usec_t sink_now;
193     pa_usec_t sink_latency;
194     size_t sink_delay;
195     int64_t send_counter;
196
197     pa_usec_t source_now;
198     pa_usec_t source_latency;
199     size_t source_delay;
200     int64_t recv_counter;
201     size_t rlen;
202     size_t plen;
203 };
204
205 struct userdata {
206     pa_core *core;
207     pa_module *module;
208
209     pa_bool_t autoloaded;
210     pa_bool_t dead;
211     pa_bool_t save_aec;
212
213     pa_echo_canceller *ec;
214     uint32_t source_blocksize;
215     uint32_t sink_blocksize;
216
217     pa_bool_t need_realign;
218
219     /* to wakeup the source I/O thread */
220     pa_asyncmsgq *asyncmsgq;
221     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
222
223     pa_source *source;
224     pa_bool_t source_auto_desc;
225     pa_source_output *source_output;
226     pa_memblockq *source_memblockq; /* echo canceler needs fixed sized chunks */
227     size_t source_skip;
228
229     pa_sink *sink;
230     pa_bool_t sink_auto_desc;
231     pa_sink_input *sink_input;
232     pa_memblockq *sink_memblockq;
233     int64_t send_counter;          /* updated in sink IO thread */
234     int64_t recv_counter;
235     size_t sink_skip;
236
237     /* Bytes left over from previous iteration */
238     size_t sink_rem;
239     size_t source_rem;
240
241     pa_atomic_t request_resync;
242
243     pa_time_event *time_event;
244     pa_usec_t adjust_time;
245     int adjust_threshold;
246
247     FILE *captured_file;
248     FILE *played_file;
249     FILE *canceled_file;
250     FILE *drift_file;
251
252     pa_bool_t use_volume_sharing;
253
254     struct {
255         pa_cvolume current_volume;
256     } thread_info;
257 };
258
259 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
260
261 static const char* const valid_modargs[] = {
262     "source_name",
263     "source_properties",
264     "source_master",
265     "sink_name",
266     "sink_properties",
267     "sink_master",
268     "adjust_time",
269     "adjust_threshold",
270     "format",
271     "rate",
272     "channels",
273     "channel_map",
274     "aec_method",
275     "aec_args",
276     "save_aec",
277     "autoloaded",
278     "use_volume_sharing",
279     NULL
280 };
281
282 enum {
283     SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
284     SOURCE_OUTPUT_MESSAGE_REWIND,
285     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
286     SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
287 };
288
289 enum {
290     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
291 };
292
293 enum {
294     ECHO_CANCELLER_MESSAGE_SET_VOLUME,
295 };
296
297 static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
298     int64_t diff_time, buffer_latency;
299     pa_usec_t plen, rlen, source_delay, sink_delay, recv_counter, send_counter;
300
301     /* get latency difference between playback and record */
302     plen = pa_bytes_to_usec(snapshot->plen, &u->sink_input->sample_spec);
303     rlen = pa_bytes_to_usec(snapshot->rlen, &u->source_output->sample_spec);
304     if (plen > rlen)
305         buffer_latency = plen - rlen;
306     else
307         buffer_latency = 0;
308
309     source_delay = pa_bytes_to_usec(snapshot->source_delay, &u->source_output->sample_spec);
310     sink_delay = pa_bytes_to_usec(snapshot->sink_delay, &u->sink_input->sample_spec);
311     buffer_latency += source_delay + sink_delay;
312
313     /* add the latency difference due to samples not yet transferred */
314     send_counter = pa_bytes_to_usec(snapshot->send_counter, &u->sink_input->sample_spec);
315     recv_counter = pa_bytes_to_usec(snapshot->recv_counter, &u->source_output->sample_spec);
316     if (recv_counter <= send_counter)
317         buffer_latency += (int64_t) (send_counter - recv_counter);
318     else
319         buffer_latency += PA_CLIP_SUB(buffer_latency, (int64_t) (recv_counter - send_counter));
320
321     /* capture and playback are perfectly aligned when diff_time is 0 */
322     diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
323           (snapshot->source_now - snapshot->source_latency);
324
325     pa_log_debug("Diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
326         (long long) snapshot->sink_latency,
327         (long long) buffer_latency, (long long) snapshot->source_latency,
328         (long long) source_delay, (long long) sink_delay,
329         (long long) (send_counter - recv_counter),
330         (long long) (snapshot->sink_now - snapshot->source_now));
331
332     return diff_time;
333 }
334
335 /* Called from main context */
336 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
337     struct userdata *u = userdata;
338     uint32_t old_rate, base_rate, new_rate;
339     int64_t diff_time;
340     /*size_t fs*/
341     struct snapshot latency_snapshot;
342
343     pa_assert(u);
344     pa_assert(a);
345     pa_assert(u->time_event == e);
346     pa_assert_ctl_context();
347
348     if (!IS_ACTIVE(u))
349         return;
350
351     /* update our snapshots */
352     pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
353     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
354
355     /* calculate drift between capture and playback */
356     diff_time = calc_diff(u, &latency_snapshot);
357
358     /*fs = pa_frame_size(&u->source_output->sample_spec);*/
359     old_rate = u->sink_input->sample_spec.rate;
360     base_rate = u->source_output->sample_spec.rate;
361
362     if (diff_time < 0) {
363         /* recording before playback, we need to adjust quickly. The echo
364          * canceler does not work in this case. */
365         pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
366             NULL, diff_time, NULL, NULL);
367         /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
368         new_rate = base_rate;
369     }
370     else {
371         if (diff_time > u->adjust_threshold) {
372             /* diff too big, quickly adjust */
373             pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
374                 NULL, diff_time, NULL, NULL);
375         }
376
377         /* recording behind playback, we need to slowly adjust the rate to match */
378         /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
379
380         /* assume equal samplerates for now */
381         new_rate = base_rate;
382     }
383
384     /* make sure we don't make too big adjustments because that sounds horrible */
385     if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
386         new_rate = base_rate;
387
388     if (new_rate != old_rate) {
389         pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
390
391         pa_sink_input_set_rate(u->sink_input, new_rate);
392     }
393
394     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
395 }
396
397 /* Called from source I/O thread context */
398 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
399     struct userdata *u = PA_SOURCE(o)->userdata;
400
401     switch (code) {
402
403         case PA_SOURCE_MESSAGE_GET_LATENCY:
404
405             /* The source is _put() before the source output is, so let's
406              * make sure we don't access it in that time. Also, the
407              * source output is first shut down, the source second. */
408             if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
409                 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
410                 *((pa_usec_t*) data) = 0;
411                 return 0;
412             }
413
414             *((pa_usec_t*) data) =
415
416                 /* Get the latency of the master source */
417                 pa_source_get_latency_within_thread(u->source_output->source) +
418                 /* Add the latency internal to our source output on top */
419                 pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
420                 /* and the buffering we do on the source */
421                 pa_bytes_to_usec(u->source_blocksize, &u->source_output->source->sample_spec);
422
423             return 0;
424
425         case PA_SOURCE_MESSAGE_SET_VOLUME_SYNCED:
426             u->thread_info.current_volume = u->source->reference_volume;
427             break;
428     }
429
430     return pa_source_process_msg(o, code, data, offset, chunk);
431 }
432
433 /* Called from sink I/O thread context */
434 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
435     struct userdata *u = PA_SINK(o)->userdata;
436
437     switch (code) {
438
439         case PA_SINK_MESSAGE_GET_LATENCY:
440
441             /* The sink is _put() before the sink input is, so let's
442              * make sure we don't access it in that time. Also, the
443              * sink input is first shut down, the sink second. */
444             if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
445                 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
446                 *((pa_usec_t*) data) = 0;
447                 return 0;
448             }
449
450             *((pa_usec_t*) data) =
451
452                 /* Get the latency of the master sink */
453                 pa_sink_get_latency_within_thread(u->sink_input->sink) +
454
455                 /* Add the latency internal to our sink input on top */
456                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
457
458             return 0;
459     }
460
461     return pa_sink_process_msg(o, code, data, offset, chunk);
462 }
463
464
465 /* Called from main context */
466 static int source_set_state_cb(pa_source *s, pa_source_state_t state) {
467     struct userdata *u;
468
469     pa_source_assert_ref(s);
470     pa_assert_se(u = s->userdata);
471
472     if (!PA_SOURCE_IS_LINKED(state) ||
473         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
474         return 0;
475
476     if (state == PA_SOURCE_RUNNING) {
477         /* restart timer when both sink and source are active */
478         if (IS_ACTIVE(u) && u->adjust_time)
479             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
480
481         pa_atomic_store(&u->request_resync, 1);
482         pa_source_output_cork(u->source_output, FALSE);
483     } else if (state == PA_SOURCE_SUSPENDED) {
484         pa_source_output_cork(u->source_output, TRUE);
485     }
486
487     return 0;
488 }
489
490 /* Called from main context */
491 static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state) {
492     struct userdata *u;
493
494     pa_sink_assert_ref(s);
495     pa_assert_se(u = s->userdata);
496
497     if (!PA_SINK_IS_LINKED(state) ||
498         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
499         return 0;
500
501     if (state == PA_SINK_RUNNING) {
502         /* restart timer when both sink and source are active */
503         if (IS_ACTIVE(u) && u->adjust_time)
504             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
505
506         pa_atomic_store(&u->request_resync, 1);
507         pa_sink_input_cork(u->sink_input, FALSE);
508     } else if (state == PA_SINK_SUSPENDED) {
509         pa_sink_input_cork(u->sink_input, TRUE);
510     }
511
512     return 0;
513 }
514
515 /* Called from source I/O thread context */
516 static void source_update_requested_latency_cb(pa_source *s) {
517     struct userdata *u;
518
519     pa_source_assert_ref(s);
520     pa_assert_se(u = s->userdata);
521
522     if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
523         !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
524         return;
525
526     pa_log_debug("Source update requested latency");
527
528     /* Just hand this one over to the master source */
529     pa_source_output_set_requested_latency_within_thread(
530             u->source_output,
531             pa_source_get_requested_latency_within_thread(s));
532 }
533
534 /* Called from sink I/O thread context */
535 static void sink_update_requested_latency_cb(pa_sink *s) {
536     struct userdata *u;
537
538     pa_sink_assert_ref(s);
539     pa_assert_se(u = s->userdata);
540
541     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
542         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
543         return;
544
545     pa_log_debug("Sink update requested latency");
546
547     /* Just hand this one over to the master sink */
548     pa_sink_input_set_requested_latency_within_thread(
549             u->sink_input,
550             pa_sink_get_requested_latency_within_thread(s));
551 }
552
553 /* Called from sink I/O thread context */
554 static void sink_request_rewind_cb(pa_sink *s) {
555     struct userdata *u;
556
557     pa_sink_assert_ref(s);
558     pa_assert_se(u = s->userdata);
559
560     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
561         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
562         return;
563
564     pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
565
566     /* Just hand this one over to the master sink */
567     pa_sink_input_request_rewind(u->sink_input,
568                                  s->thread_info.rewind_nbytes, TRUE, FALSE, FALSE);
569 }
570
571 /* Called from main context */
572 static void source_set_volume_cb(pa_source *s) {
573     struct userdata *u;
574
575     pa_source_assert_ref(s);
576     pa_assert_se(u = s->userdata);
577
578     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
579         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
580         return;
581
582     pa_source_output_set_volume(u->source_output, &s->real_volume, s->save_volume, TRUE);
583 }
584
585 /* Called from main context */
586 static void sink_set_volume_cb(pa_sink *s) {
587     struct userdata *u;
588
589     pa_sink_assert_ref(s);
590     pa_assert_se(u = s->userdata);
591
592     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
593         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
594         return;
595
596     pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, TRUE);
597 }
598
599 /* Called from main context. */
600 static void source_get_volume_cb(pa_source *s) {
601     struct userdata *u;
602     pa_cvolume v;
603
604     pa_source_assert_ref(s);
605     pa_assert_se(u = s->userdata);
606
607     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
608         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
609         return;
610
611     pa_source_output_get_volume(u->source_output, &v, TRUE);
612
613     if (pa_cvolume_equal(&s->real_volume, &v))
614         /* no change */
615         return;
616
617     s->real_volume = v;
618     pa_source_set_soft_volume(s, NULL);
619 }
620
621 /* Called from main context */
622 static void source_set_mute_cb(pa_source *s) {
623     struct userdata *u;
624
625     pa_source_assert_ref(s);
626     pa_assert_se(u = s->userdata);
627
628     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
629         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
630         return;
631
632     pa_source_output_set_mute(u->source_output, s->muted, s->save_muted);
633 }
634
635 /* Called from main context */
636 static void sink_set_mute_cb(pa_sink *s) {
637     struct userdata *u;
638
639     pa_sink_assert_ref(s);
640     pa_assert_se(u = s->userdata);
641
642     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
643         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
644         return;
645
646     pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
647 }
648
649 /* Called from main context */
650 static void source_get_mute_cb(pa_source *s) {
651     struct userdata *u;
652
653     pa_source_assert_ref(s);
654     pa_assert_se(u = s->userdata);
655
656     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
657         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
658         return;
659
660     pa_source_output_get_mute(u->source_output);
661 }
662
663 /* Called from source I/O thread context. */
664 static void apply_diff_time(struct userdata *u, int64_t diff_time) {
665     int64_t diff;
666
667     if (diff_time < 0) {
668         diff = pa_usec_to_bytes(-diff_time, &u->sink_input->sample_spec);
669
670         if (diff > 0) {
671             /* add some extra safety samples to compensate for jitter in the
672              * timings */
673             diff += 10 * pa_frame_size (&u->sink_input->sample_spec);
674
675             pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
676
677             u->sink_skip = diff;
678             u->source_skip = 0;
679         }
680     } else if (diff_time > 0) {
681         diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
682
683         if (diff > 0) {
684             pa_log("Playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
685
686             u->source_skip = diff;
687             u->sink_skip = 0;
688         }
689     }
690 }
691
692 /* Called from source I/O thread context. */
693 static void do_resync(struct userdata *u) {
694     int64_t diff_time;
695     struct snapshot latency_snapshot;
696
697     pa_log("Doing resync");
698
699     /* update our snapshot */
700     source_output_snapshot_within_thread(u, &latency_snapshot);
701     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
702
703     /* calculate drift between capture and playback */
704     diff_time = calc_diff(u, &latency_snapshot);
705
706     /* and adjust for the drift */
707     apply_diff_time(u, diff_time);
708 }
709
710 /* 1. Calculate drift at this point, pass to canceller
711  * 2. Push out playback samples in blocksize chunks
712  * 3. Push out capture samples in blocksize chunks
713  * 4. ???
714  * 5. Profit
715  *
716  * Called from source I/O thread context.
717  */
718 static void do_push_drift_comp(struct userdata *u) {
719     size_t rlen, plen;
720     pa_memchunk rchunk, pchunk, cchunk;
721     uint8_t *rdata, *pdata, *cdata;
722     float drift;
723     int unused PA_GCC_UNUSED;
724
725     rlen = pa_memblockq_get_length(u->source_memblockq);
726     plen = pa_memblockq_get_length(u->sink_memblockq);
727
728     /* Estimate snapshot drift as follows:
729      *   pd: amount of data consumed since last time
730      *   rd: amount of data consumed since last time
731      *
732      *   drift = (pd - rd) / rd;
733      *
734      * We calculate pd and rd as the memblockq length less the number of
735      * samples left from the last iteration (to avoid double counting
736      * those remainder samples.
737      */
738     drift = ((float)(plen - u->sink_rem) - (rlen - u->source_rem)) / ((float)(rlen - u->source_rem));
739     u->sink_rem = plen % u->sink_blocksize;
740     u->source_rem = rlen % u->source_blocksize;
741
742     /* Now let the canceller work its drift compensation magic */
743     u->ec->set_drift(u->ec, drift);
744
745     if (u->save_aec) {
746         if (u->drift_file)
747             fprintf(u->drift_file, "d %a\n", drift);
748     }
749
750     /* Send in the playback samples first */
751     while (plen >= u->sink_blocksize) {
752         pa_memblockq_peek_fixed_size(u->sink_memblockq, u->sink_blocksize, &pchunk);
753         pdata = pa_memblock_acquire(pchunk.memblock);
754         pdata += pchunk.index;
755
756         u->ec->play(u->ec, pdata);
757
758         if (u->save_aec) {
759             if (u->drift_file)
760                 fprintf(u->drift_file, "p %d\n", u->sink_blocksize);
761             if (u->played_file)
762                 unused = fwrite(pdata, 1, u->sink_blocksize, u->played_file);
763         }
764
765         pa_memblock_release(pchunk.memblock);
766         pa_memblockq_drop(u->sink_memblockq, u->sink_blocksize);
767         pa_memblock_unref(pchunk.memblock);
768
769         plen -= u->sink_blocksize;
770     }
771
772     /* And now the capture samples */
773     while (rlen >= u->source_blocksize) {
774         pa_memblockq_peek_fixed_size(u->source_memblockq, u->source_blocksize, &rchunk);
775
776         rdata = pa_memblock_acquire(rchunk.memblock);
777         rdata += rchunk.index;
778
779         cchunk.index = 0;
780         cchunk.length = u->source_blocksize;
781         cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
782         cdata = pa_memblock_acquire(cchunk.memblock);
783
784         u->ec->record(u->ec, rdata, cdata);
785
786         if (u->save_aec) {
787             if (u->drift_file)
788                 fprintf(u->drift_file, "c %d\n", u->source_blocksize);
789             if (u->captured_file)
790                 unused = fwrite(rdata, 1, u->source_blocksize, u->captured_file);
791             if (u->canceled_file)
792                 unused = fwrite(cdata, 1, u->source_blocksize, u->canceled_file);
793         }
794
795         pa_memblock_release(cchunk.memblock);
796         pa_memblock_release(rchunk.memblock);
797
798         pa_memblock_unref(rchunk.memblock);
799
800         pa_source_post(u->source, &cchunk);
801         pa_memblock_unref(cchunk.memblock);
802
803         pa_memblockq_drop(u->source_memblockq, u->source_blocksize);
804         rlen -= u->source_blocksize;
805     }
806 }
807
808 /* This one's simpler than the drift compensation case -- we just iterate over
809  * the capture buffer, and pass the canceller blocksize bytes of playback and
810  * capture data.
811  *
812  * Called from source I/O thread context. */
813 static void do_push(struct userdata *u) {
814     size_t rlen, plen;
815     pa_memchunk rchunk, pchunk, cchunk;
816     uint8_t *rdata, *pdata, *cdata;
817     int unused PA_GCC_UNUSED;
818
819     rlen = pa_memblockq_get_length(u->source_memblockq);
820     plen = pa_memblockq_get_length(u->sink_memblockq);
821
822     while (rlen >= u->source_blocksize) {
823         /* take fixed block from recorded samples */
824         pa_memblockq_peek_fixed_size(u->source_memblockq, u->source_blocksize, &rchunk);
825
826         if (plen >= u->sink_blocksize) {
827             /* take fixed block from played samples */
828             pa_memblockq_peek_fixed_size(u->sink_memblockq, u->sink_blocksize, &pchunk);
829
830             rdata = pa_memblock_acquire(rchunk.memblock);
831             rdata += rchunk.index;
832             pdata = pa_memblock_acquire(pchunk.memblock);
833             pdata += pchunk.index;
834
835             cchunk.index = 0;
836             cchunk.length = u->source_blocksize;
837             cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
838             cdata = pa_memblock_acquire(cchunk.memblock);
839
840             if (u->save_aec) {
841                 if (u->captured_file)
842                     unused = fwrite(rdata, 1, u->source_blocksize, u->captured_file);
843                 if (u->played_file)
844                     unused = fwrite(pdata, 1, u->sink_blocksize, u->played_file);
845             }
846
847             /* perform echo cancellation */
848             u->ec->run(u->ec, rdata, pdata, cdata);
849
850             if (u->save_aec) {
851                 if (u->canceled_file)
852                     unused = fwrite(cdata, 1, u->source_blocksize, u->canceled_file);
853             }
854
855             pa_memblock_release(cchunk.memblock);
856             pa_memblock_release(pchunk.memblock);
857             pa_memblock_release(rchunk.memblock);
858
859             /* drop consumed sink samples */
860             pa_memblockq_drop(u->sink_memblockq, u->sink_blocksize);
861             pa_memblock_unref(pchunk.memblock);
862
863             pa_memblock_unref(rchunk.memblock);
864             /* the filtered samples now become the samples from our
865              * source */
866             rchunk = cchunk;
867
868             plen -= u->sink_blocksize;
869         }
870
871         /* forward the (echo-canceled) data to the virtual source */
872         pa_source_post(u->source, &rchunk);
873         pa_memblock_unref(rchunk.memblock);
874
875         pa_memblockq_drop(u->source_memblockq, u->source_blocksize);
876         rlen -= u->source_blocksize;
877     }
878 }
879
880 /* Called from source I/O thread context. */
881 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
882     struct userdata *u;
883     size_t rlen, plen, to_skip;
884     pa_memchunk rchunk;
885
886     pa_source_output_assert_ref(o);
887     pa_source_output_assert_io_context(o);
888     pa_assert_se(u = o->userdata);
889
890     if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
891         pa_log("Push when no link?");
892         return;
893     }
894
895     if (PA_UNLIKELY(u->source->thread_info.state != PA_SOURCE_RUNNING ||
896                     u->sink->thread_info.state != PA_SINK_RUNNING)) {
897         pa_source_post(u->source, chunk);
898         return;
899     }
900
901     /* handle queued messages, do any message sending of our own */
902     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
903         ;
904
905     pa_memblockq_push_align(u->source_memblockq, chunk);
906
907     rlen = pa_memblockq_get_length(u->source_memblockq);
908     plen = pa_memblockq_get_length(u->sink_memblockq);
909
910     /* Let's not do anything else till we have enough data to process */
911     if (rlen < u->source_blocksize)
912         return;
913
914     /* See if we need to drop samples in order to sync */
915     if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
916         do_resync(u);
917     }
918
919     /* Okay, skip cancellation for skipped source samples if needed. */
920     if (PA_UNLIKELY(u->source_skip)) {
921         /* The slightly tricky bit here is that we drop all but modulo
922          * blocksize bytes and then adjust for that last bit on the sink side.
923          * We do this because the source data is coming at a fixed rate, which
924          * means the only way to try to catch up is drop sink samples and let
925          * the canceller cope up with this. */
926         to_skip = rlen >= u->source_skip ? u->source_skip : rlen;
927         to_skip -= to_skip % u->source_blocksize;
928
929         if (to_skip) {
930             pa_memblockq_peek_fixed_size(u->source_memblockq, to_skip, &rchunk);
931             pa_source_post(u->source, &rchunk);
932
933             pa_memblock_unref(rchunk.memblock);
934             pa_memblockq_drop(u->source_memblockq, to_skip);
935
936             rlen -= to_skip;
937             u->source_skip -= to_skip;
938         }
939
940         if (rlen && u->source_skip % u->source_blocksize) {
941             u->sink_skip += (uint64_t) (u->source_blocksize - (u->source_skip % u->source_blocksize)) * u->sink_blocksize / u->source_blocksize;
942             u->source_skip -= (u->source_skip % u->source_blocksize);
943         }
944     }
945
946     /* And for the sink, these samples have been played back already, so we can
947      * just drop them and get on with it. */
948     if (PA_UNLIKELY(u->sink_skip)) {
949         to_skip = plen >= u->sink_skip ? u->sink_skip : plen;
950
951         pa_memblockq_drop(u->sink_memblockq, to_skip);
952
953         plen -= to_skip;
954         u->sink_skip -= to_skip;
955     }
956
957     /* process and push out samples */
958     if (u->ec->params.drift_compensation)
959         do_push_drift_comp(u);
960     else
961         do_push(u);
962 }
963
964 /* Called from sink I/O thread context. */
965 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
966     struct userdata *u;
967
968     pa_sink_input_assert_ref(i);
969     pa_assert(chunk);
970     pa_assert_se(u = i->userdata);
971
972     if (u->sink->thread_info.rewind_requested)
973         pa_sink_process_rewind(u->sink, 0);
974
975     pa_sink_render_full(u->sink, nbytes, chunk);
976
977     if (i->thread_info.underrun_for > 0) {
978         pa_log_debug("Handling end of underrun.");
979         pa_atomic_store(&u->request_resync, 1);
980     }
981
982     /* let source thread handle the chunk. pass the sample count as well so that
983      * the source IO thread can update the right variables. */
984     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
985         NULL, 0, chunk, NULL);
986     u->send_counter += chunk->length;
987
988     return 0;
989 }
990
991 /* Called from source I/O thread context. */
992 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
993     struct userdata *u;
994
995     pa_source_output_assert_ref(o);
996     pa_source_output_assert_io_context(o);
997     pa_assert_se(u = o->userdata);
998
999     pa_source_process_rewind(u->source, nbytes);
1000
1001     /* go back on read side, we need to use older sink data for this */
1002     pa_memblockq_rewind(u->sink_memblockq, nbytes);
1003
1004     /* manipulate write index */
1005     pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, TRUE);
1006
1007     pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
1008         (long long) pa_memblockq_get_length (u->source_memblockq));
1009 }
1010
1011 /* Called from sink I/O thread context. */
1012 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
1013     struct userdata *u;
1014
1015     pa_sink_input_assert_ref(i);
1016     pa_assert_se(u = i->userdata);
1017
1018     pa_log_debug("Sink process rewind %lld", (long long) nbytes);
1019
1020     pa_sink_process_rewind(u->sink, nbytes);
1021
1022     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
1023     u->send_counter -= nbytes;
1024 }
1025
1026 /* Called from source I/O thread context. */
1027 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
1028     size_t delay, rlen, plen;
1029     pa_usec_t now, latency;
1030
1031     now = pa_rtclock_now();
1032     latency = pa_source_get_latency_within_thread(u->source_output->source);
1033     delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
1034
1035     delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
1036     rlen = pa_memblockq_get_length(u->source_memblockq);
1037     plen = pa_memblockq_get_length(u->sink_memblockq);
1038
1039     snapshot->source_now = now;
1040     snapshot->source_latency = latency;
1041     snapshot->source_delay = delay;
1042     snapshot->recv_counter = u->recv_counter;
1043     snapshot->rlen = rlen + u->sink_skip;
1044     snapshot->plen = plen + u->source_skip;
1045 }
1046
1047 /* Called from source I/O thread context. */
1048 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1049     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
1050
1051     switch (code) {
1052
1053         case SOURCE_OUTPUT_MESSAGE_POST:
1054
1055             pa_source_output_assert_io_context(u->source_output);
1056
1057             if (u->source_output->source->thread_info.state == PA_SOURCE_RUNNING)
1058                 pa_memblockq_push_align(u->sink_memblockq, chunk);
1059             else
1060                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1061
1062             u->recv_counter += (int64_t) chunk->length;
1063
1064             return 0;
1065
1066         case SOURCE_OUTPUT_MESSAGE_REWIND:
1067             pa_source_output_assert_io_context(u->source_output);
1068
1069             /* manipulate write index, never go past what we have */
1070             if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
1071                 pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, TRUE);
1072             else
1073                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
1074
1075             pa_log_debug("Sink rewind (%lld)", (long long) offset);
1076
1077             u->recv_counter -= offset;
1078
1079             return 0;
1080
1081         case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
1082             struct snapshot *snapshot = (struct snapshot *) data;
1083
1084             source_output_snapshot_within_thread(u, snapshot);
1085             return 0;
1086         }
1087
1088         case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
1089             apply_diff_time(u, offset);
1090             return 0;
1091
1092     }
1093
1094     return pa_source_output_process_msg(obj, code, data, offset, chunk);
1095 }
1096
1097 /* Called from sink I/O thread context. */
1098 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
1099     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
1100
1101     switch (code) {
1102
1103         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
1104             size_t delay;
1105             pa_usec_t now, latency;
1106             struct snapshot *snapshot = (struct snapshot *) data;
1107
1108             pa_sink_input_assert_io_context(u->sink_input);
1109
1110             now = pa_rtclock_now();
1111             latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
1112             delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
1113
1114             delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
1115
1116             snapshot->sink_now = now;
1117             snapshot->sink_latency = latency;
1118             snapshot->sink_delay = delay;
1119             snapshot->send_counter = u->send_counter;
1120             return 0;
1121         }
1122     }
1123
1124     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
1125 }
1126
1127 /* Called from sink I/O thread context. */
1128 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
1129     struct userdata *u;
1130
1131     pa_sink_input_assert_ref(i);
1132     pa_assert_se(u = i->userdata);
1133
1134     pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
1135
1136     /* FIXME: Too small max_rewind:
1137      * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
1138     pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
1139     pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
1140 }
1141
1142 /* Called from source I/O thread context. */
1143 static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
1144     struct userdata *u;
1145
1146     pa_source_output_assert_ref(o);
1147     pa_assert_se(u = o->userdata);
1148
1149     pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
1150
1151     pa_source_set_max_rewind_within_thread(u->source, nbytes);
1152 }
1153
1154 /* Called from sink I/O thread context. */
1155 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
1156     struct userdata *u;
1157
1158     pa_sink_input_assert_ref(i);
1159     pa_assert_se(u = i->userdata);
1160
1161     pa_log_debug("Sink input update max request %lld", (long long) nbytes);
1162
1163     pa_sink_set_max_request_within_thread(u->sink, nbytes);
1164 }
1165
1166 /* Called from sink I/O thread context. */
1167 static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
1168     struct userdata *u;
1169     pa_usec_t latency;
1170
1171     pa_sink_input_assert_ref(i);
1172     pa_assert_se(u = i->userdata);
1173
1174     latency = pa_sink_get_requested_latency_within_thread(i->sink);
1175
1176     pa_log_debug("Sink input update requested latency %lld", (long long) latency);
1177 }
1178
1179 /* Called from source I/O thread context. */
1180 static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
1181     struct userdata *u;
1182     pa_usec_t latency;
1183
1184     pa_source_output_assert_ref(o);
1185     pa_assert_se(u = o->userdata);
1186
1187     latency = pa_source_get_requested_latency_within_thread(o->source);
1188
1189     pa_log_debug("Source output update requested latency %lld", (long long) latency);
1190 }
1191
1192 /* Called from sink I/O thread context. */
1193 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
1194     struct userdata *u;
1195
1196     pa_sink_input_assert_ref(i);
1197     pa_assert_se(u = i->userdata);
1198
1199     pa_log_debug("Sink input update latency range %lld %lld",
1200         (long long) i->sink->thread_info.min_latency,
1201         (long long) i->sink->thread_info.max_latency);
1202
1203     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1204 }
1205
1206 /* Called from source I/O thread context. */
1207 static void source_output_update_source_latency_range_cb(pa_source_output *o) {
1208     struct userdata *u;
1209
1210     pa_source_output_assert_ref(o);
1211     pa_assert_se(u = o->userdata);
1212
1213     pa_log_debug("Source output update latency range %lld %lld",
1214         (long long) o->source->thread_info.min_latency,
1215         (long long) o->source->thread_info.max_latency);
1216
1217     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1218 }
1219
1220 /* Called from sink I/O thread context. */
1221 static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1222     struct userdata *u;
1223
1224     pa_sink_input_assert_ref(i);
1225     pa_assert_se(u = i->userdata);
1226
1227     pa_log_debug("Sink input update fixed latency %lld",
1228         (long long) i->sink->thread_info.fixed_latency);
1229
1230     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1231 }
1232
1233 /* Called from source I/O thread context. */
1234 static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1235     struct userdata *u;
1236
1237     pa_source_output_assert_ref(o);
1238     pa_assert_se(u = o->userdata);
1239
1240     pa_log_debug("Source output update fixed latency %lld",
1241         (long long) o->source->thread_info.fixed_latency);
1242
1243     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1244 }
1245
1246 /* Called from source I/O thread context. */
1247 static void source_output_attach_cb(pa_source_output *o) {
1248     struct userdata *u;
1249
1250     pa_source_output_assert_ref(o);
1251     pa_source_output_assert_io_context(o);
1252     pa_assert_se(u = o->userdata);
1253
1254     pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1255     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1256     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1257     pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1258
1259     pa_log_debug("Source output %d attach", o->index);
1260
1261     pa_source_attach_within_thread(u->source);
1262
1263     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1264             o->source->thread_info.rtpoll,
1265             PA_RTPOLL_LATE,
1266             u->asyncmsgq);
1267 }
1268
1269 /* Called from sink I/O thread context. */
1270 static void sink_input_attach_cb(pa_sink_input *i) {
1271     struct userdata *u;
1272
1273     pa_sink_input_assert_ref(i);
1274     pa_assert_se(u = i->userdata);
1275
1276     pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1277     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1278
1279     /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1280      * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1281     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1282
1283     /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1284      * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1285      * HERE. SEE (6) */
1286     pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1287
1288     /* FIXME: Too small max_rewind:
1289      * https://bugs.freedesktop.org/show_bug.cgi?id=53709 */
1290     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1291
1292     pa_log_debug("Sink input %d attach", i->index);
1293
1294     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1295             i->sink->thread_info.rtpoll,
1296             PA_RTPOLL_LATE,
1297             u->asyncmsgq);
1298
1299     pa_sink_attach_within_thread(u->sink);
1300 }
1301
1302
1303 /* Called from source I/O thread context. */
1304 static void source_output_detach_cb(pa_source_output *o) {
1305     struct userdata *u;
1306
1307     pa_source_output_assert_ref(o);
1308     pa_source_output_assert_io_context(o);
1309     pa_assert_se(u = o->userdata);
1310
1311     pa_source_detach_within_thread(u->source);
1312     pa_source_set_rtpoll(u->source, NULL);
1313
1314     pa_log_debug("Source output %d detach", o->index);
1315
1316     if (u->rtpoll_item_read) {
1317         pa_rtpoll_item_free(u->rtpoll_item_read);
1318         u->rtpoll_item_read = NULL;
1319     }
1320 }
1321
1322 /* Called from sink I/O thread context. */
1323 static void sink_input_detach_cb(pa_sink_input *i) {
1324     struct userdata *u;
1325
1326     pa_sink_input_assert_ref(i);
1327     pa_assert_se(u = i->userdata);
1328
1329     pa_sink_detach_within_thread(u->sink);
1330
1331     pa_sink_set_rtpoll(u->sink, NULL);
1332
1333     pa_log_debug("Sink input %d detach", i->index);
1334
1335     if (u->rtpoll_item_write) {
1336         pa_rtpoll_item_free(u->rtpoll_item_write);
1337         u->rtpoll_item_write = NULL;
1338     }
1339 }
1340
1341 /* Called from source I/O thread context. */
1342 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1343     struct userdata *u;
1344
1345     pa_source_output_assert_ref(o);
1346     pa_source_output_assert_io_context(o);
1347     pa_assert_se(u = o->userdata);
1348
1349     pa_log_debug("Source output %d state %d", o->index, state);
1350 }
1351
1352 /* Called from sink I/O thread context. */
1353 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1354     struct userdata *u;
1355
1356     pa_sink_input_assert_ref(i);
1357     pa_assert_se(u = i->userdata);
1358
1359     pa_log_debug("Sink input %d state %d", i->index, state);
1360
1361     /* If we are added for the first time, ask for a rewinding so that
1362      * we are heard right-away. */
1363     if (PA_SINK_INPUT_IS_LINKED(state) &&
1364         i->thread_info.state == PA_SINK_INPUT_INIT) {
1365         pa_log_debug("Requesting rewind due to state change.");
1366         pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
1367     }
1368 }
1369
1370 /* Called from main context. */
1371 static void source_output_kill_cb(pa_source_output *o) {
1372     struct userdata *u;
1373
1374     pa_source_output_assert_ref(o);
1375     pa_assert_ctl_context();
1376     pa_assert_se(u = o->userdata);
1377
1378     u->dead = TRUE;
1379
1380     /* The order here matters! We first kill the source output, followed
1381      * by the source. That means the source callbacks must be protected
1382      * against an unconnected source output! */
1383     pa_source_output_unlink(u->source_output);
1384     pa_source_unlink(u->source);
1385
1386     pa_source_output_unref(u->source_output);
1387     u->source_output = NULL;
1388
1389     pa_source_unref(u->source);
1390     u->source = NULL;
1391
1392     pa_log_debug("Source output kill %d", o->index);
1393
1394     pa_module_unload_request(u->module, TRUE);
1395 }
1396
1397 /* Called from main context */
1398 static void sink_input_kill_cb(pa_sink_input *i) {
1399     struct userdata *u;
1400
1401     pa_sink_input_assert_ref(i);
1402     pa_assert_se(u = i->userdata);
1403
1404     u->dead = TRUE;
1405
1406     /* The order here matters! We first kill the sink input, followed
1407      * by the sink. That means the sink callbacks must be protected
1408      * against an unconnected sink input! */
1409     pa_sink_input_unlink(u->sink_input);
1410     pa_sink_unlink(u->sink);
1411
1412     pa_sink_input_unref(u->sink_input);
1413     u->sink_input = NULL;
1414
1415     pa_sink_unref(u->sink);
1416     u->sink = NULL;
1417
1418     pa_log_debug("Sink input kill %d", i->index);
1419
1420     pa_module_unload_request(u->module, TRUE);
1421 }
1422
1423 /* Called from main context. */
1424 static pa_bool_t source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1425     struct userdata *u;
1426
1427     pa_source_output_assert_ref(o);
1428     pa_assert_ctl_context();
1429     pa_assert_se(u = o->userdata);
1430
1431     if (u->dead || u->autoloaded)
1432         return FALSE;
1433
1434     return (u->source != dest) && (u->sink != dest->monitor_of);
1435 }
1436
1437 /* Called from main context */
1438 static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1439     struct userdata *u;
1440
1441     pa_sink_input_assert_ref(i);
1442     pa_assert_se(u = i->userdata);
1443
1444     if (u->dead || u->autoloaded)
1445         return FALSE;
1446
1447     return u->sink != dest;
1448 }
1449
1450 /* Called from main context. */
1451 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1452     struct userdata *u;
1453
1454     pa_source_output_assert_ref(o);
1455     pa_assert_ctl_context();
1456     pa_assert_se(u = o->userdata);
1457
1458     if (dest) {
1459         pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1460         pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1461     } else
1462         pa_source_set_asyncmsgq(u->source, NULL);
1463
1464     if (u->source_auto_desc && dest) {
1465         const char *y, *z;
1466         pa_proplist *pl;
1467
1468         pl = pa_proplist_new();
1469         y = pa_proplist_gets(u->sink_input->sink->proplist, PA_PROP_DEVICE_DESCRIPTION);
1470         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1471         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1472                 y ? y : u->sink_input->sink->name);
1473
1474         pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1475         pa_proplist_free(pl);
1476     }
1477 }
1478
1479 /* Called from main context */
1480 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1481     struct userdata *u;
1482
1483     pa_sink_input_assert_ref(i);
1484     pa_assert_se(u = i->userdata);
1485
1486     if (dest) {
1487         pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1488         pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1489     } else
1490         pa_sink_set_asyncmsgq(u->sink, NULL);
1491
1492     if (u->sink_auto_desc && dest) {
1493         const char *y, *z;
1494         pa_proplist *pl;
1495
1496         pl = pa_proplist_new();
1497         y = pa_proplist_gets(u->source_output->source->proplist, PA_PROP_DEVICE_DESCRIPTION);
1498         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1499         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)", z ? z : dest->name,
1500                          y ? y : u->source_output->source->name);
1501
1502         pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1503         pa_proplist_free(pl);
1504     }
1505 }
1506
1507 /* Called from main context */
1508 static void sink_input_volume_changed_cb(pa_sink_input *i) {
1509     struct userdata *u;
1510
1511     pa_sink_input_assert_ref(i);
1512     pa_assert_se(u = i->userdata);
1513
1514     pa_sink_volume_changed(u->sink, &i->volume);
1515 }
1516
1517 /* Called from main context */
1518 static void sink_input_mute_changed_cb(pa_sink_input *i) {
1519     struct userdata *u;
1520
1521     pa_sink_input_assert_ref(i);
1522     pa_assert_se(u = i->userdata);
1523
1524     pa_sink_mute_changed(u->sink, i->muted);
1525 }
1526
1527 /* Called from main context */
1528 static int canceller_process_msg_cb(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
1529     struct pa_echo_canceller_msg *msg;
1530     struct userdata *u;
1531
1532     pa_assert(o);
1533
1534     msg = PA_ECHO_CANCELLER_MSG(o);
1535     u = msg->userdata;
1536
1537     switch (code) {
1538         case ECHO_CANCELLER_MESSAGE_SET_VOLUME: {
1539             pa_cvolume *v = (pa_cvolume *) userdata;
1540
1541             if (u->use_volume_sharing)
1542                 pa_source_set_volume(u->source, v, TRUE, FALSE);
1543             else
1544                 pa_source_output_set_volume(u->source_output, v, FALSE, TRUE);
1545
1546             break;
1547         }
1548
1549         default:
1550             pa_assert_not_reached();
1551             break;
1552     }
1553
1554     return 0;
1555 }
1556
1557 /* Called by the canceller, so source I/O thread context. */
1558 void pa_echo_canceller_get_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1559     *v = ec->msg->userdata->thread_info.current_volume;
1560 }
1561
1562 /* Called by the canceller, so source I/O thread context. */
1563 void pa_echo_canceller_set_capture_volume(pa_echo_canceller *ec, pa_cvolume *v) {
1564     if (!pa_cvolume_equal(&ec->msg->userdata->thread_info.current_volume, v)) {
1565         pa_cvolume *vol = pa_xnewdup(pa_cvolume, v, 1);
1566
1567         pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(ec->msg), ECHO_CANCELLER_MESSAGE_SET_VOLUME, vol, 0, NULL,
1568                 pa_xfree);
1569     }
1570 }
1571
1572 uint32_t pa_echo_canceller_blocksize_power2(unsigned rate, unsigned ms) {
1573     unsigned nframes = (rate * ms) / 1000;
1574     uint32_t y = 1 << ((8 * sizeof(uint32_t)) - 2);
1575
1576     assert(rate >= 4000);
1577     assert(ms >= 1);
1578
1579     /* nframes should be a power of 2, round down to nearest power of two */
1580     while (y > nframes)
1581         y >>= 1;
1582
1583     assert(y >= 1);
1584     return y;
1585 }
1586
1587 static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1588     if (pa_streq(method, "null"))
1589         return PA_ECHO_CANCELLER_NULL;
1590 #ifdef HAVE_SPEEX
1591     if (pa_streq(method, "speex"))
1592         return PA_ECHO_CANCELLER_SPEEX;
1593 #endif
1594 #ifdef HAVE_ADRIAN_EC
1595     if (pa_streq(method, "adrian"))
1596         return PA_ECHO_CANCELLER_ADRIAN;
1597 #endif
1598 #ifdef HAVE_WEBRTC
1599     if (pa_streq(method, "webrtc"))
1600         return PA_ECHO_CANCELLER_WEBRTC;
1601 #endif
1602     return PA_ECHO_CANCELLER_INVALID;
1603 }
1604
1605 /* Common initialisation bits between module-echo-cancel and the standalone
1606  * test program.
1607  *
1608  * Called from main context. */
1609 static int init_common(pa_modargs *ma, struct userdata *u, pa_sample_spec *source_ss, pa_channel_map *source_map) {
1610     const char *ec_string;
1611     pa_echo_canceller_method_t ec_method;
1612
1613     if (pa_modargs_get_sample_spec_and_channel_map(ma, source_ss, source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1614         pa_log("Invalid sample format specification or channel map");
1615         goto fail;
1616     }
1617
1618     u->ec = pa_xnew0(pa_echo_canceller, 1);
1619     if (!u->ec) {
1620         pa_log("Failed to alloc echo canceller");
1621         goto fail;
1622     }
1623
1624     ec_string = pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER);
1625     if ((ec_method = get_ec_method_from_string(ec_string)) < 0) {
1626         pa_log("Invalid echo canceller implementation");
1627         goto fail;
1628     }
1629
1630     pa_log_info("Using AEC engine: %s", ec_string);
1631
1632     u->ec->init = ec_table[ec_method].init;
1633     u->ec->play = ec_table[ec_method].play;
1634     u->ec->record = ec_table[ec_method].record;
1635     u->ec->set_drift = ec_table[ec_method].set_drift;
1636     u->ec->run = ec_table[ec_method].run;
1637     u->ec->done = ec_table[ec_method].done;
1638
1639     return 0;
1640
1641 fail:
1642     return -1;
1643 }
1644
1645 /* Called from main context. */
1646 int pa__init(pa_module*m) {
1647     struct userdata *u;
1648     pa_sample_spec source_ss, sink_ss;
1649     pa_channel_map source_map, sink_map;
1650     pa_modargs *ma;
1651     pa_source *source_master=NULL;
1652     pa_sink *sink_master=NULL;
1653     pa_source_output_new_data source_output_data;
1654     pa_sink_input_new_data sink_input_data;
1655     pa_source_new_data source_data;
1656     pa_sink_new_data sink_data;
1657     pa_memchunk silence;
1658     uint32_t temp;
1659     uint32_t nframes = 0;
1660
1661     pa_assert(m);
1662
1663     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1664         pa_log("Failed to parse module arguments.");
1665         goto fail;
1666     }
1667
1668     if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1669         pa_log("Master source not found");
1670         goto fail;
1671     }
1672     pa_assert(source_master);
1673
1674     if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1675         pa_log("Master sink not found");
1676         goto fail;
1677     }
1678     pa_assert(sink_master);
1679
1680     if (source_master->monitor_of == sink_master) {
1681         pa_log("Can't cancel echo between a sink and its monitor");
1682         goto fail;
1683     }
1684
1685     source_ss = source_master->sample_spec;
1686     source_ss.rate = DEFAULT_RATE;
1687     source_ss.channels = DEFAULT_CHANNELS;
1688     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1689
1690     sink_ss = sink_master->sample_spec;
1691     sink_map = sink_master->channel_map;
1692
1693     u = pa_xnew0(struct userdata, 1);
1694     if (!u) {
1695         pa_log("Failed to alloc userdata");
1696         goto fail;
1697     }
1698     u->core = m->core;
1699     u->module = m;
1700     m->userdata = u;
1701     u->dead = FALSE;
1702
1703     u->use_volume_sharing = TRUE;
1704     if (pa_modargs_get_value_boolean(ma, "use_volume_sharing", &u->use_volume_sharing) < 0) {
1705         pa_log("use_volume_sharing= expects a boolean argument");
1706         goto fail;
1707     }
1708
1709     temp = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1710     if (pa_modargs_get_value_u32(ma, "adjust_time", &temp) < 0) {
1711         pa_log("Failed to parse adjust_time value");
1712         goto fail;
1713     }
1714
1715     if (temp != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1716         u->adjust_time = temp * PA_USEC_PER_SEC;
1717     else
1718         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1719
1720     temp = DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC;
1721     if (pa_modargs_get_value_u32(ma, "adjust_threshold", &temp) < 0) {
1722         pa_log("Failed to parse adjust_threshold value");
1723         goto fail;
1724     }
1725
1726     if (temp != DEFAULT_ADJUST_TOLERANCE / PA_USEC_PER_MSEC)
1727         u->adjust_threshold = temp * PA_USEC_PER_MSEC;
1728     else
1729         u->adjust_threshold = DEFAULT_ADJUST_TOLERANCE;
1730
1731     u->save_aec = DEFAULT_SAVE_AEC;
1732     if (pa_modargs_get_value_boolean(ma, "save_aec", &u->save_aec) < 0) {
1733         pa_log("Failed to parse save_aec value");
1734         goto fail;
1735     }
1736
1737     u->autoloaded = DEFAULT_AUTOLOADED;
1738     if (pa_modargs_get_value_boolean(ma, "autoloaded", &u->autoloaded) < 0) {
1739         pa_log("Failed to parse autoloaded value");
1740         goto fail;
1741     }
1742
1743     if (init_common(ma, u, &source_ss, &source_map) < 0)
1744         goto fail;
1745
1746     u->asyncmsgq = pa_asyncmsgq_new(0);
1747     u->need_realign = TRUE;
1748
1749     pa_assert(u->ec->init);
1750     if (!u->ec->init(u->core, u->ec, &source_ss, &source_map, &sink_ss, &sink_map, &nframes, pa_modargs_get_value(ma, "aec_args", NULL))) {
1751         pa_log("Failed to init AEC engine");
1752         goto fail;
1753     }
1754
1755     u->source_blocksize = nframes * pa_frame_size(&source_ss);
1756     u->sink_blocksize = nframes * pa_frame_size(&sink_ss);
1757
1758     if (u->ec->params.drift_compensation)
1759         pa_assert(u->ec->set_drift);
1760
1761     /* Create source */
1762     pa_source_new_data_init(&source_data);
1763     source_data.driver = __FILE__;
1764     source_data.module = m;
1765     if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1766         source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1767     pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1768     pa_source_new_data_set_channel_map(&source_data, &source_map);
1769     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1770     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1771     if (!u->autoloaded)
1772         pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1773
1774     if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1775         pa_log("Invalid properties");
1776         pa_source_new_data_done(&source_data);
1777         goto fail;
1778     }
1779
1780     if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1781         const char *y, *z;
1782
1783         y = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1784         z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1785         pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1786                 z ? z : source_master->name, y ? y : sink_master->name);
1787     }
1788
1789     u->source = pa_source_new(m->core, &source_data, (source_master->flags & (PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY))
1790                                                      | (u->use_volume_sharing ? PA_SOURCE_SHARE_VOLUME_WITH_MASTER : 0));
1791     pa_source_new_data_done(&source_data);
1792
1793     if (!u->source) {
1794         pa_log("Failed to create source.");
1795         goto fail;
1796     }
1797
1798     u->source->parent.process_msg = source_process_msg_cb;
1799     u->source->set_state = source_set_state_cb;
1800     u->source->update_requested_latency = source_update_requested_latency_cb;
1801     pa_source_set_get_mute_callback(u->source, source_get_mute_cb);
1802     pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
1803     if (!u->use_volume_sharing) {
1804         pa_source_set_get_volume_callback(u->source, source_get_volume_cb);
1805         pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
1806         pa_source_enable_decibel_volume(u->source, TRUE);
1807     }
1808     u->source->userdata = u;
1809
1810     pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1811
1812     /* Create sink */
1813     pa_sink_new_data_init(&sink_data);
1814     sink_data.driver = __FILE__;
1815     sink_data.module = m;
1816     if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1817         sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1818     pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1819     pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1820     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1821     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1822     if (!u->autoloaded)
1823         pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1824
1825     if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1826         pa_log("Invalid properties");
1827         pa_sink_new_data_done(&sink_data);
1828         goto fail;
1829     }
1830
1831     if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1832         const char *y, *z;
1833
1834         y = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1835         z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1836         pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s (echo cancelled with %s)",
1837                 z ? z : sink_master->name, y ? y : source_master->name);
1838     }
1839
1840     u->sink = pa_sink_new(m->core, &sink_data, (sink_master->flags & (PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY))
1841                                                | (u->use_volume_sharing ? PA_SINK_SHARE_VOLUME_WITH_MASTER : 0));
1842     pa_sink_new_data_done(&sink_data);
1843
1844     if (!u->sink) {
1845         pa_log("Failed to create sink.");
1846         goto fail;
1847     }
1848
1849     u->sink->parent.process_msg = sink_process_msg_cb;
1850     u->sink->set_state = sink_set_state_cb;
1851     u->sink->update_requested_latency = sink_update_requested_latency_cb;
1852     u->sink->request_rewind = sink_request_rewind_cb;
1853     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
1854     if (!u->use_volume_sharing) {
1855         pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
1856         pa_sink_enable_decibel_volume(u->sink, TRUE);
1857     }
1858     u->sink->userdata = u;
1859
1860     pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1861
1862     /* Create source output */
1863     pa_source_output_new_data_init(&source_output_data);
1864     source_output_data.driver = __FILE__;
1865     source_output_data.module = m;
1866     pa_source_output_new_data_set_source(&source_output_data, source_master, FALSE);
1867     source_output_data.destination_source = u->source;
1868     /* FIXME
1869        source_output_data.flags = PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; */
1870
1871     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1872     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1873     pa_source_output_new_data_set_sample_spec(&source_output_data, &source_ss);
1874     pa_source_output_new_data_set_channel_map(&source_output_data, &source_map);
1875
1876     pa_source_output_new(&u->source_output, m->core, &source_output_data);
1877     pa_source_output_new_data_done(&source_output_data);
1878
1879     if (!u->source_output)
1880         goto fail;
1881
1882     u->source_output->parent.process_msg = source_output_process_msg_cb;
1883     u->source_output->push = source_output_push_cb;
1884     u->source_output->process_rewind = source_output_process_rewind_cb;
1885     u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1886     u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1887     u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1888     u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1889     u->source_output->kill = source_output_kill_cb;
1890     u->source_output->attach = source_output_attach_cb;
1891     u->source_output->detach = source_output_detach_cb;
1892     u->source_output->state_change = source_output_state_change_cb;
1893     u->source_output->may_move_to = source_output_may_move_to_cb;
1894     u->source_output->moving = source_output_moving_cb;
1895     u->source_output->userdata = u;
1896
1897     u->source->output_from_master = u->source_output;
1898
1899     /* Create sink input */
1900     pa_sink_input_new_data_init(&sink_input_data);
1901     sink_input_data.driver = __FILE__;
1902     sink_input_data.module = m;
1903     pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, FALSE);
1904     sink_input_data.origin_sink = u->sink;
1905     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
1906     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1907     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
1908     pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
1909     sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
1910
1911     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1912     pa_sink_input_new_data_done(&sink_input_data);
1913
1914     if (!u->sink_input)
1915         goto fail;
1916
1917     u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1918     u->sink_input->pop = sink_input_pop_cb;
1919     u->sink_input->process_rewind = sink_input_process_rewind_cb;
1920     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1921     u->sink_input->update_max_request = sink_input_update_max_request_cb;
1922     u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
1923     u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1924     u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
1925     u->sink_input->kill = sink_input_kill_cb;
1926     u->sink_input->attach = sink_input_attach_cb;
1927     u->sink_input->detach = sink_input_detach_cb;
1928     u->sink_input->state_change = sink_input_state_change_cb;
1929     u->sink_input->may_move_to = sink_input_may_move_to_cb;
1930     u->sink_input->moving = sink_input_moving_cb;
1931     if (!u->use_volume_sharing)
1932         u->sink_input->volume_changed = sink_input_volume_changed_cb;
1933     u->sink_input->mute_changed = sink_input_mute_changed_cb;
1934     u->sink_input->userdata = u;
1935
1936     u->sink->input_to_master = u->sink_input;
1937
1938     pa_sink_input_get_silence(u->sink_input, &silence);
1939
1940     u->source_memblockq = pa_memblockq_new("module-echo-cancel source_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1941         &source_ss, 1, 1, 0, &silence);
1942     u->sink_memblockq = pa_memblockq_new("module-echo-cancel sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0,
1943         &sink_ss, 1, 1, 0, &silence);
1944
1945     pa_memblock_unref(silence.memblock);
1946
1947     if (!u->source_memblockq || !u->sink_memblockq) {
1948         pa_log("Failed to create memblockq.");
1949         goto fail;
1950     }
1951
1952     if (u->adjust_time > 0 && !u->ec->params.drift_compensation)
1953         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1954     else if (u->ec->params.drift_compensation) {
1955         pa_log_info("Canceller does drift compensation -- built-in compensation will be disabled");
1956         u->adjust_time = 0;
1957         /* Perform resync just once to give the canceller a leg up */
1958         pa_atomic_store(&u->request_resync, 1);
1959     }
1960
1961     if (u->save_aec) {
1962         pa_log("Creating AEC files in /tmp");
1963         u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
1964         if (u->captured_file == NULL)
1965             perror ("fopen failed");
1966         u->played_file = fopen("/tmp/aec_play.sw", "wb");
1967         if (u->played_file == NULL)
1968             perror ("fopen failed");
1969         u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
1970         if (u->canceled_file == NULL)
1971             perror ("fopen failed");
1972         if (u->ec->params.drift_compensation) {
1973             u->drift_file = fopen("/tmp/aec_drift.txt", "w");
1974             if (u->drift_file == NULL)
1975                 perror ("fopen failed");
1976         }
1977     }
1978
1979     u->ec->msg = pa_msgobject_new(pa_echo_canceller_msg);
1980     u->ec->msg->parent.process_msg = canceller_process_msg_cb;
1981     u->ec->msg->userdata = u;
1982
1983     u->thread_info.current_volume = u->source->reference_volume;
1984
1985     pa_sink_put(u->sink);
1986     pa_source_put(u->source);
1987
1988     pa_sink_input_put(u->sink_input);
1989     pa_source_output_put(u->source_output);
1990     pa_modargs_free(ma);
1991
1992     return 0;
1993
1994 fail:
1995     if (ma)
1996         pa_modargs_free(ma);
1997
1998     pa__done(m);
1999
2000     return -1;
2001 }
2002
2003 /* Called from main context. */
2004 int pa__get_n_used(pa_module *m) {
2005     struct userdata *u;
2006
2007     pa_assert(m);
2008     pa_assert_se(u = m->userdata);
2009
2010     return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
2011 }
2012
2013 /* Called from main context. */
2014 void pa__done(pa_module*m) {
2015     struct userdata *u;
2016
2017     pa_assert(m);
2018
2019     if (!(u = m->userdata))
2020         return;
2021
2022     u->dead = TRUE;
2023
2024     /* See comments in source_output_kill_cb() above regarding
2025      * destruction order! */
2026
2027     if (u->time_event)
2028         u->core->mainloop->time_free(u->time_event);
2029
2030     if (u->source_output)
2031         pa_source_output_unlink(u->source_output);
2032     if (u->sink_input)
2033         pa_sink_input_unlink(u->sink_input);
2034
2035     if (u->source)
2036         pa_source_unlink(u->source);
2037     if (u->sink)
2038         pa_sink_unlink(u->sink);
2039
2040     if (u->source_output)
2041         pa_source_output_unref(u->source_output);
2042     if (u->sink_input)
2043         pa_sink_input_unref(u->sink_input);
2044
2045     if (u->source)
2046         pa_source_unref(u->source);
2047     if (u->sink)
2048         pa_sink_unref(u->sink);
2049
2050     if (u->source_memblockq)
2051         pa_memblockq_free(u->source_memblockq);
2052     if (u->sink_memblockq)
2053         pa_memblockq_free(u->sink_memblockq);
2054
2055     if (u->ec) {
2056         if (u->ec->done)
2057             u->ec->done(u->ec);
2058
2059         pa_xfree(u->ec);
2060     }
2061
2062     if (u->asyncmsgq)
2063         pa_asyncmsgq_unref(u->asyncmsgq);
2064
2065     if (u->save_aec) {
2066         if (u->played_file)
2067             fclose(u->played_file);
2068         if (u->captured_file)
2069             fclose(u->captured_file);
2070         if (u->canceled_file)
2071             fclose(u->canceled_file);
2072         if (u->drift_file)
2073             fclose(u->drift_file);
2074     }
2075
2076     pa_xfree(u);
2077 }
2078
2079 #ifdef ECHO_CANCEL_TEST
2080 /*
2081  * Stand-alone test program for running in the canceller on pre-recorded files.
2082  */
2083 int main(int argc, char* argv[]) {
2084     struct userdata u;
2085     pa_sample_spec source_ss, sink_ss;
2086     pa_channel_map source_map, sink_map;
2087     pa_modargs *ma = NULL;
2088     uint8_t *rdata = NULL, *pdata = NULL, *cdata = NULL;
2089     int unused PA_GCC_UNUSED;
2090     int ret = 0, i;
2091     char c;
2092     float drift;
2093     uint32_t nframes;
2094
2095     pa_memzero(&u, sizeof(u));
2096
2097     if (argc < 4 || argc > 7) {
2098         goto usage;
2099     }
2100
2101     u.captured_file = fopen(argv[2], "rb");
2102     if (u.captured_file == NULL) {
2103         perror ("fopen failed");
2104         goto fail;
2105     }
2106     u.played_file = fopen(argv[1], "rb");
2107     if (u.played_file == NULL) {
2108         perror ("fopen failed");
2109         goto fail;
2110     }
2111     u.canceled_file = fopen(argv[3], "wb");
2112     if (u.canceled_file == NULL) {
2113         perror ("fopen failed");
2114         goto fail;
2115     }
2116
2117     u.core = pa_xnew0(pa_core, 1);
2118     u.core->cpu_info.cpu_type = PA_CPU_X86;
2119     u.core->cpu_info.flags.x86 |= PA_CPU_X86_SSE;
2120
2121     if (!(ma = pa_modargs_new(argc > 4 ? argv[4] : NULL, valid_modargs))) {
2122         pa_log("Failed to parse module arguments.");
2123         goto fail;
2124     }
2125
2126     source_ss.format = PA_SAMPLE_S16LE;
2127     source_ss.rate = DEFAULT_RATE;
2128     source_ss.channels = DEFAULT_CHANNELS;
2129     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2130
2131     sink_ss.format = PA_SAMPLE_S16LE;
2132     sink_ss.rate = DEFAULT_RATE;
2133     sink_ss.channels = DEFAULT_CHANNELS;
2134     pa_channel_map_init_auto(&sink_map, sink_ss.channels, PA_CHANNEL_MAP_DEFAULT);
2135
2136     if (init_common(ma, &u, &source_ss, &source_map) < 0)
2137         goto fail;
2138
2139     if (!u.ec->init(u.core, u.ec, &source_ss, &source_map, &sink_ss, &sink_map, &nframes,
2140                      (argc > 5) ? argv[5] : NULL )) {
2141         pa_log("Failed to init AEC engine");
2142         goto fail;
2143     }
2144     u.source_blocksize = nframes * pa_frame_size(&source_ss);
2145     u.sink_blocksize = nframes * pa_frame_size(&sink_ss);
2146
2147     if (u.ec->params.drift_compensation) {
2148         if (argc < 7) {
2149             pa_log("Drift compensation enabled but drift file not specified");
2150             goto fail;
2151         }
2152
2153         u.drift_file = fopen(argv[6], "rt");
2154
2155         if (u.drift_file == NULL) {
2156             perror ("fopen failed");
2157             goto fail;
2158         }
2159     }
2160
2161     rdata = pa_xmalloc(u.source_blocksize);
2162     pdata = pa_xmalloc(u.sink_blocksize);
2163     cdata = pa_xmalloc(u.source_blocksize);
2164
2165     if (!u.ec->params.drift_compensation) {
2166         while (fread(rdata, u.source_blocksize, 1, u.captured_file) > 0) {
2167             if (fread(pdata, u.sink_blocksize, 1, u.played_file) == 0) {
2168                 perror("Played file ended before captured file");
2169                 goto fail;
2170             }
2171
2172             u.ec->run(u.ec, rdata, pdata, cdata);
2173
2174             unused = fwrite(cdata, u.source_blocksize, 1, u.canceled_file);
2175         }
2176     } else {
2177         while (fscanf(u.drift_file, "%c", &c) > 0) {
2178             switch (c) {
2179                 case 'd':
2180                     if (!fscanf(u.drift_file, "%a", &drift)) {
2181                         perror("Drift file incomplete");
2182                         goto fail;
2183                     }
2184
2185                     u.ec->set_drift(u.ec, drift);
2186
2187                     break;
2188
2189                 case 'c':
2190                     if (!fscanf(u.drift_file, "%d", &i)) {
2191                         perror("Drift file incomplete");
2192                         goto fail;
2193                     }
2194
2195                     if (fread(rdata, i, 1, u.captured_file) <= 0) {
2196                         perror("Captured file ended prematurely");
2197                         goto fail;
2198                     }
2199
2200                     u.ec->record(u.ec, rdata, cdata);
2201
2202                     unused = fwrite(cdata, i, 1, u.canceled_file);
2203
2204                     break;
2205
2206                 case 'p':
2207                     if (!fscanf(u.drift_file, "%d", &i)) {
2208                         perror("Drift file incomplete");
2209                         goto fail;
2210                     }
2211
2212                     if (fread(pdata, i, 1, u.played_file) <= 0) {
2213                         perror("Played file ended prematurely");
2214                         goto fail;
2215                     }
2216
2217                     u.ec->play(u.ec, pdata);
2218
2219                     break;
2220             }
2221         }
2222
2223         if (fread(rdata, i, 1, u.captured_file) > 0)
2224             pa_log("All capture data was not consumed");
2225         if (fread(pdata, i, 1, u.played_file) > 0)
2226             pa_log("All playback data was not consumed");
2227     }
2228
2229     u.ec->done(u.ec);
2230
2231 out:
2232     if (u.captured_file)
2233         fclose(u.captured_file);
2234     if (u.played_file)
2235         fclose(u.played_file);
2236     if (u.canceled_file)
2237         fclose(u.canceled_file);
2238     if (u.drift_file)
2239         fclose(u.drift_file);
2240
2241     pa_xfree(rdata);
2242     pa_xfree(pdata);
2243     pa_xfree(cdata);
2244
2245     pa_xfree(u.ec);
2246     pa_xfree(u.core);
2247
2248     if (ma)
2249         pa_modargs_free(ma);
2250
2251     return ret;
2252
2253 usage:
2254     pa_log("Usage: %s play_file rec_file out_file [module args] [aec_args] [drift_file]", argv[0]);
2255
2256 fail:
2257     ret = -1;
2258     goto out;
2259 }
2260 #endif /* ECHO_CANCEL_TEST */