echo-cancel: Fix echo suppression, add some knobs
[profile/ivi/pulseaudio.git] / src / modules / echo-cancel / module-echo-cancel.c
1 /***
2     This file is part of PulseAudio.
3
4     Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6     Based on module-virtual-sink.c
7              module-virtual-source.c
8              module-loopback.c
9
10         Copyright 2010 Intel Corporation
11         Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13     PulseAudio is free software; you can redistribute it and/or modify
14     it under the terms of the GNU Lesser General Public License as published
15     by the Free Software Foundation; either version 2.1 of the License,
16     or (at your option) any later version.
17
18     PulseAudio is distributed in the hope that it will be useful, but
19     WITHOUT ANY WARRANTY; without even the implied warranty of
20     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21     General Public License for more details.
22
23     You should have received a copy of the GNU Lesser General Public License
24     along with PulseAudio; if not, write to the Free Software
25     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26     USA.
27 ***/
28
29 #ifdef HAVE_CONFIG_H
30 #include <config.h>
31 #endif
32
33 #include <stdio.h>
34 #include <math.h>
35
36 #include "echo-cancel.h"
37
38 #include <pulse/xmalloc.h>
39 #include <pulse/i18n.h>
40 #include <pulse/timeval.h>
41 #include <pulse/rtclock.h>
42
43 #include <pulsecore/atomic.h>
44 #include <pulsecore/macro.h>
45 #include <pulsecore/core-error.h>
46 #include <pulsecore/namereg.h>
47 #include <pulsecore/sink.h>
48 #include <pulsecore/module.h>
49 #include <pulsecore/core-rtclock.h>
50 #include <pulsecore/core-util.h>
51 #include <pulsecore/core-error.h>
52 #include <pulsecore/modargs.h>
53 #include <pulsecore/log.h>
54 #include <pulsecore/thread.h>
55 #include <pulsecore/thread-mq.h>
56 #include <pulsecore/rtpoll.h>
57 #include <pulsecore/sample-util.h>
58 #include <pulsecore/ltdl-helper.h>
59
60 #include "module-echo-cancel-symdef.h"
61
62 PA_MODULE_AUTHOR("Wim Taymans");
63 PA_MODULE_DESCRIPTION("Echo Cancelation");
64 PA_MODULE_VERSION(PACKAGE_VERSION);
65 PA_MODULE_LOAD_ONCE(FALSE);
66 PA_MODULE_USAGE(
67         _("source_name=<name for the source> "
68           "source_properties=<properties for the source> "
69           "source_master=<name of source to filter> "
70           "sink_name=<name for the sink> "
71           "sink_properties=<properties for the sink> "
72           "sink_master=<name of sink to filter> "
73           "adjust_time=<how often to readjust rates in s> "
74           "format=<sample format> "
75           "rate=<sample rate> "
76           "channels=<number of channels> "
77           "channel_map=<channel map> "
78           "aec_method=<implementation to use> "
79           "aec_args=<parameters for the AEC engine> "
80           "agc=<perform automagic gain control?> "
81           "denoise=<apply denoising?> "
82           "echo_suppress=<perform residual echo suppression? (only with the speex canceller)> "
83           "echo_suppress_attenuation=<dB value of residual echo attenuation> "
84           "echo_suppress_attenuation_active=<dB value of residual echo attenuation when near end is active> "
85           "save_aec=<save AEC data in /tmp> "
86           "autoloaded=<set if this module is being loaded automatically> "
87         ));
88
89 /* NOTE: Make sure the enum and ec_table are maintained in the correct order */
90 typedef enum {
91     PA_ECHO_CANCELLER_INVALID = -1,
92     PA_ECHO_CANCELLER_SPEEX = 0,
93     PA_ECHO_CANCELLER_ADRIAN,
94 } pa_echo_canceller_method_t;
95
96 #define DEFAULT_ECHO_CANCELLER "speex"
97
98 static const pa_echo_canceller ec_table[] = {
99     {
100         /* Speex */
101         .init                   = pa_speex_ec_init,
102         .run                    = pa_speex_ec_run,
103         .done                   = pa_speex_ec_done,
104     },
105     {
106         /* Adrian Andre's NLMS implementation */
107         .init                   = pa_adrian_ec_init,
108         .run                    = pa_adrian_ec_run,
109         .done                   = pa_adrian_ec_done,
110     },
111 };
112
113 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
114 #define DEFAULT_AGC_ENABLED FALSE
115 #define DEFAULT_DENOISE_ENABLED FALSE
116 #define DEFAULT_ECHO_SUPPRESS_ENABLED FALSE
117 #define DEFAULT_ECHO_SUPPRESS_ATTENUATION 0
118 #define DEFAULT_SAVE_AEC 0
119 #define DEFAULT_AUTOLOADED FALSE
120
121 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
122
123 /* This module creates a new (virtual) source and sink.
124  *
125  * The data sent to the new sink is kept in a memblockq before being
126  * forwarded to the real sink_master.
127  *
128  * Data read from source_master is matched against the saved sink data and
129  * echo canceled data is then pushed onto the new source.
130  *
131  * Both source and sink masters have their own threads to push/pull data
132  * respectively. We however perform all our actions in the source IO thread.
133  * To do this we send all played samples to the source IO thread where they
134  * are then pushed into the memblockq.
135  *
136  * Alignment is performed in two steps:
137  *
138  * 1) when something happens that requires quick adjustement of the alignment of
139  *    capture and playback samples, we perform a resync. This adjusts the
140  *    position in the playback memblock to the requested sample. Quick
141  *    adjustements include moving the playback samples before the capture
142  *    samples (because else the echo canceler does not work) or when the
143  *    playback pointer drifts too far away.
144  *
145  * 2) periodically check the difference between capture and playback. we use a
146  *    low and high watermark for adjusting the alignment. playback should always
147  *    be before capture and the difference should not be bigger than one frame
148  *    size. We would ideally like to resample the sink_input but most driver
149  *    don't give enough accuracy to be able to do that right now.
150  */
151
152 struct snapshot {
153     pa_usec_t sink_now;
154     pa_usec_t sink_latency;
155     size_t sink_delay;
156     int64_t send_counter;
157
158     pa_usec_t source_now;
159     pa_usec_t source_latency;
160     size_t source_delay;
161     int64_t recv_counter;
162     size_t rlen;
163     size_t plen;
164 };
165
166 struct userdata {
167     pa_core *core;
168     pa_module *module;
169
170     pa_bool_t autoloaded;
171     uint32_t save_aec;
172
173     pa_echo_canceller *ec;
174     uint32_t blocksize;
175
176     pa_bool_t need_realign;
177
178     /* to wakeup the source I/O thread */
179     pa_bool_t in_push;
180     pa_asyncmsgq *asyncmsgq;
181     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
182
183     pa_source *source;
184     pa_bool_t source_auto_desc;
185     pa_source_output *source_output;
186     pa_memblockq *source_memblockq; /* echo canceler needs fixed sized chunks */
187     size_t source_skip;
188
189     pa_sink *sink;
190     pa_bool_t sink_auto_desc;
191     pa_sink_input *sink_input;
192     pa_memblockq *sink_memblockq;
193     int64_t send_counter;          /* updated in sink IO thread */
194     int64_t recv_counter;
195     size_t sink_skip;
196
197     pa_atomic_t request_resync;
198
199     int active_mask;
200     pa_time_event *time_event;
201     pa_usec_t adjust_time;
202
203     FILE *captured_file;
204     FILE *played_file;
205     FILE *canceled_file;
206 };
207
208 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
209
210 static const char* const valid_modargs[] = {
211     "source_name",
212     "source_properties",
213     "source_master",
214     "sink_name",
215     "sink_properties",
216     "sink_master",
217     "adjust_time",
218     "format",
219     "rate",
220     "channels",
221     "channel_map",
222     "aec_method",
223     "aec_args",
224     "agc",
225     "denoise",
226     "echo_suppress",
227     "echo_suppress_attenuation",
228     "echo_suppress_attenuation_active",
229     "save_aec",
230     "autoloaded",
231     NULL
232 };
233
234 enum {
235     SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
236     SOURCE_OUTPUT_MESSAGE_REWIND,
237     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
238     SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
239 };
240
241 enum {
242     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
243 };
244
245 static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
246     int64_t buffer, diff_time, buffer_latency;
247
248     /* get the number of samples between capture and playback */
249     if (snapshot->plen > snapshot->rlen)
250         buffer = snapshot->plen - snapshot->rlen;
251     else
252         buffer = 0;
253
254     buffer += snapshot->source_delay + snapshot->sink_delay;
255
256     /* add the amount of samples not yet transfered to the source context */
257     if (snapshot->recv_counter <= snapshot->send_counter)
258         buffer += (int64_t) (snapshot->send_counter - snapshot->recv_counter);
259     else
260         buffer += PA_CLIP_SUB(buffer, (int64_t) (snapshot->recv_counter - snapshot->send_counter));
261
262     /* convert to time */
263     buffer_latency = pa_bytes_to_usec(buffer, &u->source_output->sample_spec);
264
265     /* capture and playback samples are perfectly aligned when diff_time is 0 */
266     diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
267           (snapshot->source_now - snapshot->source_latency);
268
269     pa_log_debug("diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
270         (long long) snapshot->sink_latency,
271         (long long) buffer_latency, (long long) snapshot->source_latency,
272         (long long) snapshot->source_delay, (long long) snapshot->sink_delay,
273         (long long) (snapshot->send_counter - snapshot->recv_counter),
274         (long long) (snapshot->sink_now - snapshot->source_now));
275
276     return diff_time;
277 }
278
279 /* Called from main context */
280 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
281     struct userdata *u = userdata;
282     uint32_t old_rate, base_rate, new_rate;
283     int64_t diff_time;
284     /*size_t fs*/
285     struct snapshot latency_snapshot;
286
287     pa_assert(u);
288     pa_assert(a);
289     pa_assert(u->time_event == e);
290     pa_assert_ctl_context();
291
292     if (u->active_mask != 3)
293         return;
294
295     /* update our snapshots */
296     pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
297     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
298
299     /* calculate drift between capture and playback */
300     diff_time = calc_diff(u, &latency_snapshot);
301
302     /*fs = pa_frame_size(&u->source_output->sample_spec);*/
303     old_rate = u->sink_input->sample_spec.rate;
304     base_rate = u->source_output->sample_spec.rate;
305
306     if (diff_time < 0) {
307         /* recording before playback, we need to adjust quickly. The echo
308          * canceler does not work in this case. */
309         pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
310             NULL, diff_time, NULL, NULL);
311         /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
312         new_rate = base_rate;
313     }
314     else {
315         if (diff_time > 1000) {
316             /* diff too big, quickly adjust */
317             pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
318                 NULL, diff_time, NULL, NULL);
319         }
320
321         /* recording behind playback, we need to slowly adjust the rate to match */
322         /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
323
324         /* assume equal samplerates for now */
325         new_rate = base_rate;
326     }
327
328     /* make sure we don't make too big adjustements because that sounds horrible */
329     if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
330         new_rate = base_rate;
331
332     if (new_rate != old_rate) {
333         pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
334
335         pa_sink_input_set_rate(u->sink_input, new_rate);
336     }
337
338     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
339 }
340
341 /* Called from source I/O thread context */
342 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
343     struct userdata *u = PA_SOURCE(o)->userdata;
344
345     switch (code) {
346
347         case PA_SOURCE_MESSAGE_GET_LATENCY:
348
349             /* The source is _put() before the source output is, so let's
350              * make sure we don't access it in that time. Also, the
351              * source output is first shut down, the source second. */
352             if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
353                 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
354                 *((pa_usec_t*) data) = 0;
355                 return 0;
356             }
357
358             *((pa_usec_t*) data) =
359
360                 /* Get the latency of the master source */
361                 pa_source_get_latency_within_thread(u->source_output->source) +
362                 /* Add the latency internal to our source output on top */
363                 pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
364                 /* and the buffering we do on the source */
365                 pa_bytes_to_usec(u->blocksize, &u->source_output->source->sample_spec);
366
367             return 0;
368
369     }
370
371     return pa_source_process_msg(o, code, data, offset, chunk);
372 }
373
374 /* Called from sink I/O thread context */
375 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
376     struct userdata *u = PA_SINK(o)->userdata;
377
378     switch (code) {
379
380         case PA_SINK_MESSAGE_GET_LATENCY:
381
382             /* The sink is _put() before the sink input is, so let's
383              * make sure we don't access it in that time. Also, the
384              * sink input is first shut down, the sink second. */
385             if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
386                 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
387                 *((pa_usec_t*) data) = 0;
388                 return 0;
389             }
390
391             *((pa_usec_t*) data) =
392
393                 /* Get the latency of the master sink */
394                 pa_sink_get_latency_within_thread(u->sink_input->sink) +
395
396                 /* Add the latency internal to our sink input on top */
397                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
398
399             return 0;
400     }
401
402     return pa_sink_process_msg(o, code, data, offset, chunk);
403 }
404
405
406 /* Called from main context */
407 static int source_set_state_cb(pa_source *s, pa_source_state_t state) {
408     struct userdata *u;
409
410     pa_source_assert_ref(s);
411     pa_assert_se(u = s->userdata);
412
413     if (!PA_SOURCE_IS_LINKED(state) ||
414         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
415         return 0;
416
417     pa_log_debug("Source state %d %d", state, u->active_mask);
418
419     if (state == PA_SOURCE_RUNNING) {
420         /* restart timer when both sink and source are active */
421         u->active_mask |= 1;
422         if (u->active_mask == 3)
423             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
424
425         pa_atomic_store(&u->request_resync, 1);
426         pa_source_output_cork(u->source_output, FALSE);
427     } else if (state == PA_SOURCE_SUSPENDED) {
428         u->active_mask &= ~1;
429         pa_source_output_cork(u->source_output, TRUE);
430     }
431     return 0;
432 }
433
434 /* Called from main context */
435 static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state) {
436     struct userdata *u;
437
438     pa_sink_assert_ref(s);
439     pa_assert_se(u = s->userdata);
440
441     if (!PA_SINK_IS_LINKED(state) ||
442         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
443         return 0;
444
445     pa_log_debug("Sink state %d %d", state, u->active_mask);
446
447     if (state == PA_SINK_RUNNING) {
448         /* restart timer when both sink and source are active */
449         u->active_mask |= 2;
450         if (u->active_mask == 3)
451             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
452
453         pa_atomic_store(&u->request_resync, 1);
454         pa_sink_input_cork(u->sink_input, FALSE);
455     } else if (state == PA_SINK_SUSPENDED) {
456         u->active_mask &= ~2;
457         pa_sink_input_cork(u->sink_input, TRUE);
458     }
459     return 0;
460 }
461
462 /* Called from I/O thread context */
463 static void source_update_requested_latency_cb(pa_source *s) {
464     struct userdata *u;
465
466     pa_source_assert_ref(s);
467     pa_assert_se(u = s->userdata);
468
469     if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
470         !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
471         return;
472
473     pa_log_debug("Source update requested latency");
474
475     /* Just hand this one over to the master source */
476     pa_source_output_set_requested_latency_within_thread(
477             u->source_output,
478             pa_source_get_requested_latency_within_thread(s));
479 }
480
481 /* Called from I/O thread context */
482 static void sink_update_requested_latency_cb(pa_sink *s) {
483     struct userdata *u;
484
485     pa_sink_assert_ref(s);
486     pa_assert_se(u = s->userdata);
487
488     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
489         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
490         return;
491
492     pa_log_debug("Sink update requested latency");
493
494     /* Just hand this one over to the master sink */
495     pa_sink_input_set_requested_latency_within_thread(
496             u->sink_input,
497             pa_sink_get_requested_latency_within_thread(s));
498 }
499
500 /* Called from I/O thread context */
501 static void sink_request_rewind_cb(pa_sink *s) {
502     struct userdata *u;
503
504     pa_sink_assert_ref(s);
505     pa_assert_se(u = s->userdata);
506
507     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
508         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
509         return;
510
511     pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
512
513     /* Just hand this one over to the master sink */
514     pa_sink_input_request_rewind(u->sink_input,
515                                  s->thread_info.rewind_nbytes, TRUE, FALSE, FALSE);
516 }
517
518 /* Called from main context */
519 static void source_set_volume_cb(pa_source *s) {
520     struct userdata *u;
521
522     pa_source_assert_ref(s);
523     pa_assert_se(u = s->userdata);
524
525     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
526         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
527         return;
528
529     /* FIXME, no volume control in source_output, set volume at the master */
530     pa_source_set_volume(u->source_output->source, &s->volume, TRUE);
531 }
532
533 /* Called from main context */
534 static void sink_set_volume_cb(pa_sink *s) {
535     struct userdata *u;
536
537     pa_sink_assert_ref(s);
538     pa_assert_se(u = s->userdata);
539
540     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
541         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
542         return;
543
544     pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, TRUE);
545 }
546
547 static void source_get_volume_cb(pa_source *s) {
548     struct userdata *u;
549
550     pa_source_assert_ref(s);
551     pa_assert_se(u = s->userdata);
552
553     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
554         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
555         return;
556
557     /* FIXME, no volume control in source_output, get the info from the master */
558     pa_source_get_volume(u->source_output->source, TRUE);
559
560     if (pa_cvolume_equal(&s->volume,&u->source_output->source->volume))
561         /* no change */
562         return;
563
564     s->volume = u->source_output->source->volume;
565     pa_source_set_soft_volume(s, NULL);
566 }
567
568
569 /* Called from main context */
570 static void source_set_mute_cb(pa_source *s) {
571     struct userdata *u;
572
573     pa_source_assert_ref(s);
574     pa_assert_se(u = s->userdata);
575
576     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
577         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
578         return;
579
580     /* FIXME, no volume control in source_output, set mute at the master */
581     pa_source_set_mute(u->source_output->source, TRUE, TRUE);
582 }
583
584 /* Called from main context */
585 static void sink_set_mute_cb(pa_sink *s) {
586     struct userdata *u;
587
588     pa_sink_assert_ref(s);
589     pa_assert_se(u = s->userdata);
590
591     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
592         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
593         return;
594
595     pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
596 }
597
598 /* Called from main context */
599 static void source_get_mute_cb(pa_source *s) {
600     struct userdata *u;
601
602     pa_source_assert_ref(s);
603     pa_assert_se(u = s->userdata);
604
605     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
606         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
607         return;
608
609     /* FIXME, no volume control in source_output, get the info from the master */
610     pa_source_get_mute(u->source_output->source, TRUE);
611 }
612
613 /* must be called from the input thread context */
614 static void apply_diff_time(struct userdata *u, int64_t diff_time) {
615     int64_t diff;
616
617     if (diff_time < 0) {
618         diff = pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec);
619
620         if (diff > 0) {
621             /* add some extra safety samples to compensate for jitter in the
622              * timings */
623             diff += 10 * pa_frame_size (&u->source_output->sample_spec);
624
625             pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
626
627             u->sink_skip = diff;
628             u->source_skip = 0;
629         }
630     } else if (diff_time > 0) {
631         diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
632
633         if (diff > 0) {
634             pa_log("playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
635
636             u->source_skip = diff;
637             u->sink_skip = 0;
638         }
639     }
640 }
641
642 /* must be called from the input thread */
643 static void do_resync(struct userdata *u) {
644     int64_t diff_time;
645     struct snapshot latency_snapshot;
646
647     pa_log("Doing resync");
648
649     /* update our snapshot */
650     source_output_snapshot_within_thread(u, &latency_snapshot);
651     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
652
653     /* calculate drift between capture and playback */
654     diff_time = calc_diff(u, &latency_snapshot);
655
656     /* and adjust for the drift */
657     apply_diff_time(u, diff_time);
658 }
659
660 /* Called from input thread context */
661 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
662     struct userdata *u;
663     size_t rlen, plen;
664
665     pa_source_output_assert_ref(o);
666     pa_source_output_assert_io_context(o);
667     pa_assert_se(u = o->userdata);
668
669     if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
670         pa_log("push when no link?");
671         return;
672     }
673
674     /* handle queued messages */
675     u->in_push = TRUE;
676     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
677         ;
678     u->in_push = FALSE;
679
680     if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
681         do_resync(u);
682     }
683
684     pa_memblockq_push_align(u->source_memblockq, chunk);
685
686     rlen = pa_memblockq_get_length(u->source_memblockq);
687     plen = pa_memblockq_get_length(u->sink_memblockq);
688
689     while (rlen >= u->blocksize) {
690         pa_memchunk rchunk, pchunk;
691
692         /* take fixed block from recorded samples */
693         pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
694
695         if (plen > u->blocksize && u->source_skip == 0) {
696             uint8_t *rdata, *pdata, *cdata;
697             pa_memchunk cchunk;
698
699             if (u->sink_skip) {
700                 size_t to_skip;
701
702                 if (u->sink_skip > plen)
703                     to_skip = plen;
704                 else
705                     to_skip = u->sink_skip;
706
707                 pa_memblockq_drop(u->sink_memblockq, to_skip);
708                 plen -= to_skip;
709
710                 u->sink_skip -= to_skip;
711             }
712
713             if (plen > u->blocksize && u->sink_skip == 0) {
714                 /* take fixed block from played samples */
715                 pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
716
717                 rdata = pa_memblock_acquire(rchunk.memblock);
718                 rdata += rchunk.index;
719                 pdata = pa_memblock_acquire(pchunk.memblock);
720                 pdata += pchunk.index;
721
722                 cchunk.index = 0;
723                 cchunk.length = u->blocksize;
724                 cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
725                 cdata = pa_memblock_acquire(cchunk.memblock);
726
727                 if (u->save_aec) {
728                     if (u->captured_file)
729                         fwrite(rdata, 1, u->blocksize, u->captured_file);
730                     if (u->played_file)
731                         fwrite(pdata, 1, u->blocksize, u->played_file);
732                 }
733
734                 /* perform echo cancelation */
735                 u->ec->run(u->ec, rdata, pdata, cdata);
736
737                 /* preprecessor is run after AEC. This is not a mistake! */
738                 if (u->ec->pp_state)
739                     speex_preprocess_run(u->ec->pp_state, (spx_int16_t *) cdata);
740
741                 if (u->save_aec) {
742                     if (u->canceled_file)
743                         fwrite(cdata, 1, u->blocksize, u->canceled_file);
744                 }
745
746                 pa_memblock_release(cchunk.memblock);
747                 pa_memblock_release(pchunk.memblock);
748                 pa_memblock_release(rchunk.memblock);
749
750                 /* drop consumed sink samples */
751                 pa_memblockq_drop(u->sink_memblockq, u->blocksize);
752                 pa_memblock_unref(pchunk.memblock);
753
754                 pa_memblock_unref(rchunk.memblock);
755                 /* the filtered samples now become the samples from our
756                  * source */
757                 rchunk = cchunk;
758
759                 plen -= u->blocksize;
760             }
761         }
762
763         /* forward the (echo-canceled) data to the virtual source */
764         pa_source_post(u->source, &rchunk);
765         pa_memblock_unref(rchunk.memblock);
766
767         pa_memblockq_drop(u->source_memblockq, u->blocksize);
768         rlen -= u->blocksize;
769
770         if (u->source_skip) {
771             if (u->source_skip > u->blocksize) {
772                 u->source_skip -= u->blocksize;
773             }
774             else {
775                 u->sink_skip += (u->blocksize - u->source_skip);
776                 u->source_skip = 0;
777             }
778         }
779     }
780 }
781
782 /* Called from I/O thread context */
783 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
784     struct userdata *u;
785
786     pa_sink_input_assert_ref(i);
787     pa_assert(chunk);
788     pa_assert_se(u = i->userdata);
789
790     if (u->sink->thread_info.rewind_requested)
791         pa_sink_process_rewind(u->sink, 0);
792
793     pa_sink_render_full(u->sink, nbytes, chunk);
794
795     if (i->thread_info.underrun_for > 0) {
796         pa_log_debug("Handling end of underrun.");
797         pa_atomic_store(&u->request_resync, 1);
798     }
799
800     /* let source thread handle the chunk. pass the sample count as well so that
801      * the source IO thread can update the right variables. */
802     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
803         NULL, 0, chunk, NULL);
804     u->send_counter += chunk->length;
805
806     return 0;
807 }
808
809 /* Called from input thread context */
810 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
811     struct userdata *u;
812
813     pa_source_output_assert_ref(o);
814     pa_source_output_assert_io_context(o);
815     pa_assert_se(u = o->userdata);
816
817     pa_source_process_rewind(u->source, nbytes);
818
819     /* go back on read side, we need to use older sink data for this */
820     pa_memblockq_rewind(u->sink_memblockq, nbytes);
821
822     /* manipulate write index */
823     pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, TRUE);
824
825     pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
826         (long long) pa_memblockq_get_length (u->source_memblockq));
827 }
828
829 /* Called from I/O thread context */
830 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
831     struct userdata *u;
832
833     pa_sink_input_assert_ref(i);
834     pa_assert_se(u = i->userdata);
835
836     pa_log_debug("Sink process rewind %lld", (long long) nbytes);
837
838     pa_sink_process_rewind(u->sink, nbytes);
839
840     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
841     u->send_counter -= nbytes;
842 }
843
844 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
845     size_t delay, rlen, plen;
846     pa_usec_t now, latency;
847
848     now = pa_rtclock_now();
849     latency = pa_source_get_latency_within_thread(u->source_output->source);
850     delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
851
852     delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
853     rlen = pa_memblockq_get_length(u->source_memblockq);
854     plen = pa_memblockq_get_length(u->sink_memblockq);
855
856     snapshot->source_now = now;
857     snapshot->source_latency = latency;
858     snapshot->source_delay = delay;
859     snapshot->recv_counter = u->recv_counter;
860     snapshot->rlen = rlen + u->sink_skip;
861     snapshot->plen = plen + u->source_skip;
862 }
863
864
865 /* Called from output thread context */
866 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
867     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
868
869     switch (code) {
870
871         case SOURCE_OUTPUT_MESSAGE_POST:
872
873             pa_source_output_assert_io_context(u->source_output);
874
875             if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
876                 pa_memblockq_push_align(u->sink_memblockq, chunk);
877             else
878                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
879
880             u->recv_counter += (int64_t) chunk->length;
881
882             return 0;
883
884         case SOURCE_OUTPUT_MESSAGE_REWIND:
885             pa_source_output_assert_io_context(u->source_output);
886
887             /* manipulate write index, never go past what we have */
888             if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
889                 pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, TRUE);
890             else
891                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
892
893             pa_log_debug("Sink rewind (%lld)", (long long) offset);
894
895             u->recv_counter -= offset;
896
897             return 0;
898
899         case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
900             struct snapshot *snapshot = (struct snapshot *) data;
901
902             source_output_snapshot_within_thread(u, snapshot);
903             return 0;
904         }
905
906         case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
907             apply_diff_time(u, offset);
908             return 0;
909
910     }
911
912     return pa_source_output_process_msg(obj, code, data, offset, chunk);
913 }
914
915 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
916     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
917
918     switch (code) {
919
920         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
921             size_t delay;
922             pa_usec_t now, latency;
923             struct snapshot *snapshot = (struct snapshot *) data;
924
925             pa_sink_input_assert_io_context(u->sink_input);
926
927             now = pa_rtclock_now();
928             latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
929             delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
930
931             delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
932
933             snapshot->sink_now = now;
934             snapshot->sink_latency = latency;
935             snapshot->sink_delay = delay;
936             snapshot->send_counter = u->send_counter;
937             return 0;
938         }
939     }
940
941     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
942 }
943
944 /* Called from I/O thread context */
945 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
946     struct userdata *u;
947
948     pa_sink_input_assert_ref(i);
949     pa_assert_se(u = i->userdata);
950
951     pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
952
953     pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
954     pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
955 }
956
957 /* Called from I/O thread context */
958 static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
959     struct userdata *u;
960
961     pa_source_output_assert_ref(o);
962     pa_assert_se(u = o->userdata);
963
964     pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
965
966     pa_source_set_max_rewind_within_thread(u->source, nbytes);
967 }
968
969 /* Called from I/O thread context */
970 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
971     struct userdata *u;
972
973     pa_sink_input_assert_ref(i);
974     pa_assert_se(u = i->userdata);
975
976     pa_log_debug("Sink input update max request %lld", (long long) nbytes);
977
978     pa_sink_set_max_request_within_thread(u->sink, nbytes);
979 }
980
981 /* Called from I/O thread context */
982 static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
983     struct userdata *u;
984     pa_usec_t latency;
985
986     pa_sink_input_assert_ref(i);
987     pa_assert_se(u = i->userdata);
988
989     latency = pa_sink_get_requested_latency_within_thread(i->sink);
990
991     pa_log_debug("Sink input update requested latency %lld", (long long) latency);
992 }
993
994 /* Called from I/O thread context */
995 static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
996     struct userdata *u;
997     pa_usec_t latency;
998
999     pa_source_output_assert_ref(o);
1000     pa_assert_se(u = o->userdata);
1001
1002     latency = pa_source_get_requested_latency_within_thread(o->source);
1003
1004     pa_log_debug("source output update requested latency %lld", (long long) latency);
1005 }
1006
1007 /* Called from I/O thread context */
1008 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
1009     struct userdata *u;
1010
1011     pa_sink_input_assert_ref(i);
1012     pa_assert_se(u = i->userdata);
1013
1014     pa_log_debug("Sink input update latency range %lld %lld",
1015         (long long) i->sink->thread_info.min_latency,
1016         (long long) i->sink->thread_info.max_latency);
1017
1018     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1019 }
1020
1021 /* Called from I/O thread context */
1022 static void source_output_update_source_latency_range_cb(pa_source_output *o) {
1023     struct userdata *u;
1024
1025     pa_source_output_assert_ref(o);
1026     pa_assert_se(u = o->userdata);
1027
1028     pa_log_debug("Source output update latency range %lld %lld",
1029         (long long) o->source->thread_info.min_latency,
1030         (long long) o->source->thread_info.max_latency);
1031
1032     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1033 }
1034
1035 /* Called from I/O thread context */
1036 static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1037     struct userdata *u;
1038
1039     pa_sink_input_assert_ref(i);
1040     pa_assert_se(u = i->userdata);
1041
1042     pa_log_debug("Sink input update fixed latency %lld",
1043         (long long) i->sink->thread_info.fixed_latency);
1044
1045     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1046 }
1047
1048 /* Called from I/O thread context */
1049 static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1050     struct userdata *u;
1051
1052     pa_source_output_assert_ref(o);
1053     pa_assert_se(u = o->userdata);
1054
1055     pa_log_debug("Source output update fixed latency %lld",
1056         (long long) o->source->thread_info.fixed_latency);
1057
1058     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1059 }
1060
1061 /* Called from output thread context */
1062 static void source_output_attach_cb(pa_source_output *o) {
1063     struct userdata *u;
1064
1065     pa_source_output_assert_ref(o);
1066     pa_source_output_assert_io_context(o);
1067     pa_assert_se(u = o->userdata);
1068
1069     pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1070     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1071     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1072     pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1073
1074     pa_log_debug("Source output %p attach", o);
1075
1076     pa_source_attach_within_thread(u->source);
1077
1078     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1079             o->source->thread_info.rtpoll,
1080             PA_RTPOLL_LATE,
1081             u->asyncmsgq);
1082 }
1083
1084 /* Called from I/O thread context */
1085 static void sink_input_attach_cb(pa_sink_input *i) {
1086     struct userdata *u;
1087
1088     pa_sink_input_assert_ref(i);
1089     pa_assert_se(u = i->userdata);
1090
1091     pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1092     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1093
1094     /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1095      * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1096     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1097
1098     /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1099      * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1100      * HERE. SEE (6) */
1101     pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1102     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1103
1104     pa_log_debug("Sink input %p attach", i);
1105
1106     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1107             i->sink->thread_info.rtpoll,
1108             PA_RTPOLL_LATE,
1109             u->asyncmsgq);
1110
1111     pa_sink_attach_within_thread(u->sink);
1112 }
1113
1114
1115 /* Called from output thread context */
1116 static void source_output_detach_cb(pa_source_output *o) {
1117     struct userdata *u;
1118
1119     pa_source_output_assert_ref(o);
1120     pa_source_output_assert_io_context(o);
1121     pa_assert_se(u = o->userdata);
1122
1123     pa_source_detach_within_thread(u->source);
1124     pa_source_set_rtpoll(u->source, NULL);
1125
1126     pa_log_debug("Source output %p detach", o);
1127
1128     if (u->rtpoll_item_read) {
1129         pa_rtpoll_item_free(u->rtpoll_item_read);
1130         u->rtpoll_item_read = NULL;
1131     }
1132 }
1133
1134 /* Called from I/O thread context */
1135 static void sink_input_detach_cb(pa_sink_input *i) {
1136     struct userdata *u;
1137
1138     pa_sink_input_assert_ref(i);
1139     pa_assert_se(u = i->userdata);
1140
1141     pa_sink_detach_within_thread(u->sink);
1142
1143     pa_sink_set_rtpoll(u->sink, NULL);
1144
1145     pa_log_debug("Sink input %p detach", i);
1146
1147     if (u->rtpoll_item_write) {
1148         pa_rtpoll_item_free(u->rtpoll_item_write);
1149         u->rtpoll_item_write = NULL;
1150     }
1151 }
1152
1153 /* Called from output thread context */
1154 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1155     struct userdata *u;
1156
1157     pa_source_output_assert_ref(o);
1158     pa_source_output_assert_io_context(o);
1159     pa_assert_se(u = o->userdata);
1160
1161     pa_log_debug("Source output %p state %d", o, state);
1162 }
1163
1164 /* Called from IO thread context */
1165 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1166     struct userdata *u;
1167
1168     pa_sink_input_assert_ref(i);
1169     pa_assert_se(u = i->userdata);
1170
1171     pa_log_debug("Sink input %p state %d", i, state);
1172
1173     /* If we are added for the first time, ask for a rewinding so that
1174      * we are heard right-away. */
1175     if (PA_SINK_INPUT_IS_LINKED(state) &&
1176         i->thread_info.state == PA_SINK_INPUT_INIT) {
1177         pa_log_debug("Requesting rewind due to state change.");
1178         pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
1179     }
1180 }
1181
1182 /* Called from main thread */
1183 static void source_output_kill_cb(pa_source_output *o) {
1184     struct userdata *u;
1185
1186     pa_source_output_assert_ref(o);
1187     pa_assert_ctl_context();
1188     pa_assert_se(u = o->userdata);
1189
1190     /* The order here matters! We first kill the source output, followed
1191      * by the source. That means the source callbacks must be protected
1192      * against an unconnected source output! */
1193     pa_source_output_unlink(u->source_output);
1194     pa_source_unlink(u->source);
1195
1196     pa_source_output_unref(u->source_output);
1197     u->source_output = NULL;
1198
1199     pa_source_unref(u->source);
1200     u->source = NULL;
1201
1202     pa_log_debug("Source output kill %p", o);
1203
1204     pa_module_unload_request(u->module, TRUE);
1205 }
1206
1207 /* Called from main context */
1208 static void sink_input_kill_cb(pa_sink_input *i) {
1209     struct userdata *u;
1210
1211     pa_sink_input_assert_ref(i);
1212     pa_assert_se(u = i->userdata);
1213
1214     /* The order here matters! We first kill the sink input, followed
1215      * by the sink. That means the sink callbacks must be protected
1216      * against an unconnected sink input! */
1217     pa_sink_input_unlink(u->sink_input);
1218     pa_sink_unlink(u->sink);
1219
1220     pa_sink_input_unref(u->sink_input);
1221     u->sink_input = NULL;
1222
1223     pa_sink_unref(u->sink);
1224     u->sink = NULL;
1225
1226     pa_log_debug("Sink input kill %p", i);
1227
1228     pa_module_unload_request(u->module, TRUE);
1229 }
1230
1231 /* Called from main thread */
1232 static pa_bool_t source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1233     struct userdata *u;
1234
1235     pa_source_output_assert_ref(o);
1236     pa_assert_ctl_context();
1237     pa_assert_se(u = o->userdata);
1238
1239     return (u->source != dest) && (u->sink != dest->monitor_of);
1240 }
1241
1242 /* Called from main context */
1243 static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1244     struct userdata *u;
1245
1246     pa_sink_input_assert_ref(i);
1247     pa_assert_se(u = i->userdata);
1248
1249     return u->sink != dest;
1250 }
1251
1252 /* Called from main thread */
1253 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1254     struct userdata *u;
1255
1256     pa_source_output_assert_ref(o);
1257     pa_assert_ctl_context();
1258     pa_assert_se(u = o->userdata);
1259
1260     if (dest) {
1261         pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1262         pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1263     } else
1264         pa_source_set_asyncmsgq(u->source, NULL);
1265
1266     if (u->source_auto_desc && dest) {
1267         const char *z;
1268         pa_proplist *pl;
1269
1270         pl = pa_proplist_new();
1271         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1272         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "Echo-Cancel Source %s on %s",
1273                          pa_proplist_gets(u->source->proplist, "device.echo-cancel.name"), z ? z : dest->name);
1274
1275         pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1276         pa_proplist_free(pl);
1277     }
1278 }
1279
1280 /* Called from main context */
1281 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1282     struct userdata *u;
1283
1284     pa_sink_input_assert_ref(i);
1285     pa_assert_se(u = i->userdata);
1286
1287     if (dest) {
1288         pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1289         pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1290     } else
1291         pa_sink_set_asyncmsgq(u->sink, NULL);
1292
1293     if (u->sink_auto_desc && dest) {
1294         const char *z;
1295         pa_proplist *pl;
1296
1297         pl = pa_proplist_new();
1298         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1299         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "Echo-Cancel Sink %s on %s",
1300                          pa_proplist_gets(u->sink->proplist, "device.echo-cancel.name"), z ? z : dest->name);
1301
1302         pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1303         pa_proplist_free(pl);
1304     }
1305 }
1306
1307 /* Called from main context */
1308 static void sink_input_volume_changed_cb(pa_sink_input *i) {
1309     struct userdata *u;
1310
1311     pa_sink_input_assert_ref(i);
1312     pa_assert_se(u = i->userdata);
1313
1314     pa_sink_volume_changed(u->sink, &i->volume);
1315 }
1316
1317 /* Called from main context */
1318 static void sink_input_mute_changed_cb(pa_sink_input *i) {
1319     struct userdata *u;
1320
1321     pa_sink_input_assert_ref(i);
1322     pa_assert_se(u = i->userdata);
1323
1324     pa_sink_mute_changed(u->sink, i->muted);
1325 }
1326
1327 static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1328     if (strcmp(method, "speex") == 0)
1329         return PA_ECHO_CANCELLER_SPEEX;
1330     else if (strcmp(method, "adrian") == 0)
1331         return PA_ECHO_CANCELLER_ADRIAN;
1332     else
1333         return PA_ECHO_CANCELLER_INVALID;
1334 }
1335
1336 int pa__init(pa_module*m) {
1337     struct userdata *u;
1338     pa_sample_spec source_ss, sink_ss;
1339     pa_channel_map source_map, sink_map;
1340     pa_modargs *ma;
1341     pa_source *source_master=NULL;
1342     pa_sink *sink_master=NULL;
1343     pa_source_output_new_data source_output_data;
1344     pa_sink_input_new_data sink_input_data;
1345     pa_source_new_data source_data;
1346     pa_sink_new_data sink_data;
1347     pa_memchunk silence;
1348     pa_echo_canceller_method_t ec_method;
1349     uint32_t adjust_time_sec;
1350
1351     pa_assert(m);
1352
1353     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1354         pa_log("Failed to parse module arguments.");
1355         goto fail;
1356     }
1357
1358     if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1359         pa_log("Master source not found");
1360         goto fail;
1361     }
1362     pa_assert(source_master);
1363
1364     if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1365         pa_log("Master sink not found");
1366         goto fail;
1367     }
1368     pa_assert(sink_master);
1369
1370     source_ss = source_master->sample_spec;
1371     source_map = source_master->channel_map;
1372     if (pa_modargs_get_sample_spec_and_channel_map(ma, &source_ss, &source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1373         pa_log("Invalid sample format specification or channel map");
1374         goto fail;
1375     }
1376
1377     sink_ss = sink_master->sample_spec;
1378     sink_map = sink_master->channel_map;
1379
1380     u = pa_xnew0(struct userdata, 1);
1381     if (!u) {
1382         pa_log("Failed to alloc userdata");
1383         goto fail;
1384     }
1385     u->core = m->core;
1386     u->module = m;
1387     m->userdata = u;
1388
1389     u->ec = pa_xnew0(pa_echo_canceller, 1);
1390     if (!u->ec) {
1391         pa_log("Failed to alloc echo canceller");
1392         goto fail;
1393     }
1394
1395     if ((ec_method = get_ec_method_from_string(pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER))) < 0) {
1396         pa_log("Invalid echo canceller implementation");
1397         goto fail;
1398     }
1399
1400     u->ec->init = ec_table[ec_method].init;
1401     u->ec->run = ec_table[ec_method].run;
1402     u->ec->done = ec_table[ec_method].done;
1403
1404     adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1405     if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) {
1406         pa_log("Failed to parse adjust_time value");
1407         goto fail;
1408     }
1409
1410     if (adjust_time_sec != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1411         u->adjust_time = adjust_time_sec * PA_USEC_PER_SEC;
1412     else
1413         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1414
1415     u->ec->agc = DEFAULT_AGC_ENABLED;
1416     if (pa_modargs_get_value_boolean(ma, "agc", &u->ec->agc) < 0) {
1417         pa_log("Failed to parse agc value");
1418         goto fail;
1419     }
1420
1421     u->ec->denoise = DEFAULT_DENOISE_ENABLED;
1422     if (pa_modargs_get_value_boolean(ma, "denoise", &u->ec->denoise) < 0) {
1423         pa_log("Failed to parse denoise value");
1424         goto fail;
1425     }
1426
1427     u->ec->echo_suppress = DEFAULT_ECHO_SUPPRESS_ENABLED;
1428     if (pa_modargs_get_value_boolean(ma, "echo_suppress", &u->ec->echo_suppress) < 0) {
1429         pa_log("Failed to parse echo_suppress value");
1430         goto fail;
1431     }
1432     if (u->ec->echo_suppress && ec_method != PA_ECHO_CANCELLER_SPEEX) {
1433         pa_log("Echo suppression is only useful with the speex canceller");
1434         goto fail;
1435     }
1436
1437     u->ec->echo_suppress_attenuation = DEFAULT_ECHO_SUPPRESS_ATTENUATION;
1438     if (pa_modargs_get_value_s32(ma, "echo_suppress_attenuation", &u->ec->echo_suppress_attenuation) < 0) {
1439         pa_log("Failed to parse echo_suppress_attenuation value");
1440         goto fail;
1441     }
1442     if (u->ec->echo_suppress_attenuation > 0) {
1443         pa_log("echo_suppress_attenuation should be a negative dB value");
1444         goto fail;
1445     }
1446
1447     u->ec->echo_suppress_attenuation_active = DEFAULT_ECHO_SUPPRESS_ATTENUATION;
1448     if (pa_modargs_get_value_s32(ma, "echo_suppress_attenuation_active", &u->ec->echo_suppress_attenuation_active) < 0) {
1449         pa_log("Failed to parse echo_supress_attenuation_active value");
1450         goto fail;
1451     }
1452     if (u->ec->echo_suppress_attenuation_active > 0) {
1453         pa_log("echo_suppress_attenuation_active should be a negative dB value");
1454         goto fail;
1455     }
1456
1457     u->save_aec = DEFAULT_SAVE_AEC;
1458     if (pa_modargs_get_value_u32(ma, "save_aec", &u->save_aec) < 0) {
1459         pa_log("Failed to parse save_aec value");
1460         goto fail;
1461     }
1462
1463     u->autoloaded = DEFAULT_AUTOLOADED;
1464     if (pa_modargs_get_value_boolean(ma, "autoloaded", &u->autoloaded) < 0) {
1465         pa_log("Failed to parse autoloaded value");
1466         goto fail;
1467     }
1468
1469     u->asyncmsgq = pa_asyncmsgq_new(0);
1470     u->need_realign = TRUE;
1471     if (u->ec->init) {
1472         if (!u->ec->init(u->core, u->ec, &source_ss, &source_map, &sink_ss, &sink_map, &u->blocksize, pa_modargs_get_value(ma, "aec_args", NULL))) {
1473             pa_log("Failed to init AEC engine");
1474             goto fail;
1475         }
1476     }
1477
1478     if (u->ec->agc || u->ec->denoise || u->ec->echo_suppress) {
1479         if (source_ss.channels != 1) {
1480             pa_log("AGC, denoising and echo suppression only work with channels=1");
1481             goto fail;
1482         }
1483
1484         u->ec->pp_state = speex_preprocess_state_init(u->blocksize, source_ss.rate);
1485
1486         speex_preprocess_ctl(u->ec->pp_state, SPEEX_PREPROCESS_SET_AGC, &u->ec->agc);
1487         speex_preprocess_ctl(u->ec->pp_state, SPEEX_PREPROCESS_SET_DENOISE, &u->ec->denoise);
1488         if (u->ec->echo_suppress) {
1489             if (u->ec->echo_suppress_attenuation)
1490                 speex_preprocess_ctl(u->ec->pp_state, SPEEX_PREPROCESS_SET_ECHO_SUPPRESS, &u->ec->echo_suppress_attenuation);
1491             if (u->ec->echo_suppress_attenuation_active) {
1492                 speex_preprocess_ctl(u->ec->pp_state, SPEEX_PREPROCESS_SET_ECHO_SUPPRESS_ACTIVE,
1493                                      &u->ec->echo_suppress_attenuation_active);
1494             }
1495             speex_preprocess_ctl(u->ec->pp_state, SPEEX_PREPROCESS_SET_ECHO_STATE, u->ec->params.priv.speex.state);
1496         }
1497     }
1498
1499     /* Create source */
1500     pa_source_new_data_init(&source_data);
1501     source_data.driver = __FILE__;
1502     source_data.module = m;
1503     if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1504         source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1505     pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1506     pa_source_new_data_set_channel_map(&source_data, &source_map);
1507     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1508     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1509     if (!u->autoloaded)
1510         pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1511     pa_proplist_sets(source_data.proplist, "device.echo-cancel.name", source_data.name);
1512
1513     if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1514         pa_log("Invalid properties");
1515         pa_source_new_data_done(&source_data);
1516         goto fail;
1517     }
1518
1519     if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1520         const char *z;
1521
1522         z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1523         pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Echo-Cancel Source %s on %s", source_data.name, z ? z : source_master->name);
1524     }
1525
1526     u->source = pa_source_new(m->core, &source_data,
1527                           PA_SOURCE_HW_MUTE_CTRL|PA_SOURCE_HW_VOLUME_CTRL|PA_SOURCE_DECIBEL_VOLUME|
1528                           (source_master->flags & (PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY)));
1529     pa_source_new_data_done(&source_data);
1530
1531     if (!u->source) {
1532         pa_log("Failed to create source.");
1533         goto fail;
1534     }
1535
1536     u->source->parent.process_msg = source_process_msg_cb;
1537     u->source->set_state = source_set_state_cb;
1538     u->source->update_requested_latency = source_update_requested_latency_cb;
1539     u->source->set_volume = source_set_volume_cb;
1540     u->source->set_mute = source_set_mute_cb;
1541     u->source->get_volume = source_get_volume_cb;
1542     u->source->get_mute = source_get_mute_cb;
1543     u->source->userdata = u;
1544
1545     pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1546
1547     /* Create sink */
1548     pa_sink_new_data_init(&sink_data);
1549     sink_data.driver = __FILE__;
1550     sink_data.module = m;
1551     if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1552         sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1553     pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1554     pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1555     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1556     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1557     if (!u->autoloaded)
1558         pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1559     pa_proplist_sets(sink_data.proplist, "device.echo-cancel.name", sink_data.name);
1560
1561     if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1562         pa_log("Invalid properties");
1563         pa_sink_new_data_done(&sink_data);
1564         goto fail;
1565     }
1566
1567     if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1568         const char *z;
1569
1570         z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1571         pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Echo-Cancel Sink %s on %s", sink_data.name, z ? z : sink_master->name);
1572     }
1573
1574     u->sink = pa_sink_new(m->core, &sink_data,
1575                           PA_SINK_HW_MUTE_CTRL|PA_SINK_HW_VOLUME_CTRL|PA_SINK_DECIBEL_VOLUME|
1576                           (sink_master->flags & (PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY)));
1577     pa_sink_new_data_done(&sink_data);
1578
1579     if (!u->sink) {
1580         pa_log("Failed to create sink.");
1581         goto fail;
1582     }
1583
1584     u->sink->parent.process_msg = sink_process_msg_cb;
1585     u->sink->set_state = sink_set_state_cb;
1586     u->sink->update_requested_latency = sink_update_requested_latency_cb;
1587     u->sink->request_rewind = sink_request_rewind_cb;
1588     u->sink->set_volume = sink_set_volume_cb;
1589     u->sink->set_mute = sink_set_mute_cb;
1590     u->sink->userdata = u;
1591
1592     pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1593
1594     /* Create source output */
1595     pa_source_output_new_data_init(&source_output_data);
1596     source_output_data.driver = __FILE__;
1597     source_output_data.module = m;
1598     source_output_data.source = source_master;
1599     source_output_data.destination_source = u->source;
1600     /* FIXME
1601        source_output_data.flags = PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; */
1602
1603     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1604     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1605     pa_source_output_new_data_set_sample_spec(&source_output_data, &source_ss);
1606     pa_source_output_new_data_set_channel_map(&source_output_data, &source_map);
1607
1608     pa_source_output_new(&u->source_output, m->core, &source_output_data);
1609     pa_source_output_new_data_done(&source_output_data);
1610
1611     if (!u->source_output)
1612         goto fail;
1613
1614     u->source_output->parent.process_msg = source_output_process_msg_cb;
1615     u->source_output->push = source_output_push_cb;
1616     u->source_output->process_rewind = source_output_process_rewind_cb;
1617     u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1618     u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1619     u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1620     u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1621     u->source_output->kill = source_output_kill_cb;
1622     u->source_output->attach = source_output_attach_cb;
1623     u->source_output->detach = source_output_detach_cb;
1624     u->source_output->state_change = source_output_state_change_cb;
1625     u->source_output->may_move_to = source_output_may_move_to_cb;
1626     u->source_output->moving = source_output_moving_cb;
1627     u->source_output->userdata = u;
1628
1629     u->source->output_from_master = u->source_output;
1630
1631     /* Create sink input */
1632     pa_sink_input_new_data_init(&sink_input_data);
1633     sink_input_data.driver = __FILE__;
1634     sink_input_data.module = m;
1635     pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, FALSE);
1636     sink_input_data.origin_sink = u->sink;
1637     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
1638     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1639     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
1640     pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
1641     sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
1642
1643     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1644     pa_sink_input_new_data_done(&sink_input_data);
1645
1646     if (!u->sink_input)
1647         goto fail;
1648
1649     u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1650     u->sink_input->pop = sink_input_pop_cb;
1651     u->sink_input->process_rewind = sink_input_process_rewind_cb;
1652     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1653     u->sink_input->update_max_request = sink_input_update_max_request_cb;
1654     u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
1655     u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1656     u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
1657     u->sink_input->kill = sink_input_kill_cb;
1658     u->sink_input->attach = sink_input_attach_cb;
1659     u->sink_input->detach = sink_input_detach_cb;
1660     u->sink_input->state_change = sink_input_state_change_cb;
1661     u->sink_input->may_move_to = sink_input_may_move_to_cb;
1662     u->sink_input->moving = sink_input_moving_cb;
1663     u->sink_input->volume_changed = sink_input_volume_changed_cb;
1664     u->sink_input->mute_changed = sink_input_mute_changed_cb;
1665     u->sink_input->userdata = u;
1666
1667     u->sink->input_to_master = u->sink_input;
1668
1669     pa_sink_input_get_silence(u->sink_input, &silence);
1670
1671     u->source_memblockq = pa_memblockq_new(0, MEMBLOCKQ_MAXLENGTH, 0,
1672         pa_frame_size(&source_ss), 1, 1, 0, &silence);
1673     u->sink_memblockq = pa_memblockq_new(0, MEMBLOCKQ_MAXLENGTH, 0,
1674         pa_frame_size(&sink_ss), 1, 1, 0, &silence);
1675
1676     pa_memblock_unref(silence.memblock);
1677
1678     if (!u->source_memblockq || !u->sink_memblockq) {
1679         pa_log("Failed to create memblockq.");
1680         goto fail;
1681     }
1682
1683     /* our source and sink are not suspended when we create them */
1684     u->active_mask = 3;
1685
1686     if (u->adjust_time > 0)
1687         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1688
1689     if (u->save_aec) {
1690         pa_log("Creating AEC files in /tmp");
1691         u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
1692         if (u->captured_file == NULL)
1693             perror ("fopen failed");
1694         u->played_file = fopen("/tmp/aec_play.sw", "wb");
1695         if (u->played_file == NULL)
1696             perror ("fopen failed");
1697         u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
1698         if (u->canceled_file == NULL)
1699             perror ("fopen failed");
1700     }
1701
1702     pa_sink_put(u->sink);
1703     pa_source_put(u->source);
1704
1705     pa_sink_input_put(u->sink_input);
1706     pa_source_output_put(u->source_output);
1707
1708     pa_modargs_free(ma);
1709
1710     return 0;
1711
1712 fail:
1713     if (ma)
1714         pa_modargs_free(ma);
1715
1716     pa__done(m);
1717
1718     return -1;
1719 }
1720
1721 int pa__get_n_used(pa_module *m) {
1722     struct userdata *u;
1723
1724     pa_assert(m);
1725     pa_assert_se(u = m->userdata);
1726
1727     return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
1728 }
1729
1730 void pa__done(pa_module*m) {
1731     struct userdata *u;
1732
1733     pa_assert(m);
1734
1735     if (!(u = m->userdata))
1736         return;
1737
1738     /* See comments in source_output_kill_cb() above regarding
1739      * destruction order! */
1740
1741     if (u->time_event)
1742         u->core->mainloop->time_free(u->time_event);
1743
1744     if (u->source_output)
1745         pa_source_output_unlink(u->source_output);
1746     if (u->sink_input)
1747         pa_sink_input_unlink(u->sink_input);
1748
1749     if (u->source)
1750         pa_source_unlink(u->source);
1751     if (u->sink)
1752         pa_sink_unlink(u->sink);
1753
1754     if (u->source_output)
1755         pa_source_output_unref(u->source_output);
1756     if (u->sink_input)
1757         pa_sink_input_unref(u->sink_input);
1758
1759     if (u->source)
1760         pa_source_unref(u->source);
1761     if (u->sink)
1762         pa_sink_unref(u->sink);
1763
1764     if (u->source_memblockq)
1765         pa_memblockq_free(u->source_memblockq);
1766     if (u->sink_memblockq)
1767         pa_memblockq_free(u->sink_memblockq);
1768
1769     if (u->ec->pp_state)
1770         speex_preprocess_state_destroy(u->ec->pp_state);
1771
1772     if (u->ec) {
1773         if (u->ec->done)
1774             u->ec->done(u->ec);
1775
1776         pa_xfree(u->ec);
1777     }
1778
1779     if (u->asyncmsgq)
1780         pa_asyncmsgq_unref(u->asyncmsgq);
1781
1782     pa_xfree(u);
1783 }