echo-cancel: Don't allow streams to attach while unloading
[platform/upstream/pulseaudio.git] / src / modules / echo-cancel / module-echo-cancel.c
1 /***
2     This file is part of PulseAudio.
3
4     Copyright 2010 Wim Taymans <wim.taymans@gmail.com>
5
6     Based on module-virtual-sink.c
7              module-virtual-source.c
8              module-loopback.c
9
10         Copyright 2010 Intel Corporation
11         Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
12
13     PulseAudio is free software; you can redistribute it and/or modify
14     it under the terms of the GNU Lesser General Public License as published
15     by the Free Software Foundation; either version 2.1 of the License,
16     or (at your option) any later version.
17
18     PulseAudio is distributed in the hope that it will be useful, but
19     WITHOUT ANY WARRANTY; without even the implied warranty of
20     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21     General Public License for more details.
22
23     You should have received a copy of the GNU Lesser General Public License
24     along with PulseAudio; if not, write to the Free Software
25     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26     USA.
27 ***/
28
29 #ifdef HAVE_CONFIG_H
30 #include <config.h>
31 #endif
32
33 #include <stdio.h>
34
35 #include "echo-cancel.h"
36
37 #include <pulse/xmalloc.h>
38 #include <pulse/timeval.h>
39 #include <pulse/rtclock.h>
40
41 #include <pulsecore/i18n.h>
42 #include <pulsecore/atomic.h>
43 #include <pulsecore/macro.h>
44 #include <pulsecore/namereg.h>
45 #include <pulsecore/sink.h>
46 #include <pulsecore/module.h>
47 #include <pulsecore/core-rtclock.h>
48 #include <pulsecore/core-util.h>
49 #include <pulsecore/modargs.h>
50 #include <pulsecore/log.h>
51 #include <pulsecore/rtpoll.h>
52 #include <pulsecore/sample-util.h>
53 #include <pulsecore/ltdl-helper.h>
54
55 #include "module-echo-cancel-symdef.h"
56
57 PA_MODULE_AUTHOR("Wim Taymans");
58 PA_MODULE_DESCRIPTION("Echo Cancellation");
59 PA_MODULE_VERSION(PACKAGE_VERSION);
60 PA_MODULE_LOAD_ONCE(FALSE);
61 PA_MODULE_USAGE(
62         _("source_name=<name for the source> "
63           "source_properties=<properties for the source> "
64           "source_master=<name of source to filter> "
65           "sink_name=<name for the sink> "
66           "sink_properties=<properties for the sink> "
67           "sink_master=<name of sink to filter> "
68           "adjust_time=<how often to readjust rates in s> "
69           "format=<sample format> "
70           "rate=<sample rate> "
71           "channels=<number of channels> "
72           "channel_map=<channel map> "
73           "aec_method=<implementation to use> "
74           "aec_args=<parameters for the AEC engine> "
75           "save_aec=<save AEC data in /tmp> "
76           "autoloaded=<set if this module is being loaded automatically> "
77         ));
78
79 /* NOTE: Make sure the enum and ec_table are maintained in the correct order */
80 typedef enum {
81     PA_ECHO_CANCELLER_INVALID = -1,
82     PA_ECHO_CANCELLER_SPEEX = 0,
83     PA_ECHO_CANCELLER_ADRIAN,
84 } pa_echo_canceller_method_t;
85
86 #define DEFAULT_ECHO_CANCELLER "speex"
87
88 static const pa_echo_canceller ec_table[] = {
89     {
90         /* Speex */
91         .init                   = pa_speex_ec_init,
92         .run                    = pa_speex_ec_run,
93         .done                   = pa_speex_ec_done,
94     },
95     {
96         /* Adrian Andre's NLMS implementation */
97         .init                   = pa_adrian_ec_init,
98         .run                    = pa_adrian_ec_run,
99         .done                   = pa_adrian_ec_done,
100     },
101 };
102
103 #define DEFAULT_RATE 32000
104 #define DEFAULT_CHANNELS 1
105 #define DEFAULT_ADJUST_TIME_USEC (1*PA_USEC_PER_SEC)
106 #define DEFAULT_SAVE_AEC FALSE
107 #define DEFAULT_AUTOLOADED FALSE
108
109 #define MEMBLOCKQ_MAXLENGTH (16*1024*1024)
110
111 /* This module creates a new (virtual) source and sink.
112  *
113  * The data sent to the new sink is kept in a memblockq before being
114  * forwarded to the real sink_master.
115  *
116  * Data read from source_master is matched against the saved sink data and
117  * echo canceled data is then pushed onto the new source.
118  *
119  * Both source and sink masters have their own threads to push/pull data
120  * respectively. We however perform all our actions in the source IO thread.
121  * To do this we send all played samples to the source IO thread where they
122  * are then pushed into the memblockq.
123  *
124  * Alignment is performed in two steps:
125  *
126  * 1) when something happens that requires quick adjustment of the alignment of
127  *    capture and playback samples, we perform a resync. This adjusts the
128  *    position in the playback memblock to the requested sample. Quick
129  *    adjustments include moving the playback samples before the capture
130  *    samples (because else the echo canceler does not work) or when the
131  *    playback pointer drifts too far away.
132  *
133  * 2) periodically check the difference between capture and playback. we use a
134  *    low and high watermark for adjusting the alignment. playback should always
135  *    be before capture and the difference should not be bigger than one frame
136  *    size. We would ideally like to resample the sink_input but most driver
137  *    don't give enough accuracy to be able to do that right now.
138  */
139
140 struct snapshot {
141     pa_usec_t sink_now;
142     pa_usec_t sink_latency;
143     size_t sink_delay;
144     int64_t send_counter;
145
146     pa_usec_t source_now;
147     pa_usec_t source_latency;
148     size_t source_delay;
149     int64_t recv_counter;
150     size_t rlen;
151     size_t plen;
152 };
153
154 struct userdata {
155     pa_core *core;
156     pa_module *module;
157
158     pa_bool_t autoloaded;
159     pa_bool_t dead;
160     pa_bool_t save_aec;
161
162     pa_echo_canceller *ec;
163     uint32_t blocksize;
164
165     pa_bool_t need_realign;
166
167     /* to wakeup the source I/O thread */
168     pa_bool_t in_push;
169     pa_asyncmsgq *asyncmsgq;
170     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
171
172     pa_source *source;
173     pa_bool_t source_auto_desc;
174     pa_source_output *source_output;
175     pa_memblockq *source_memblockq; /* echo canceler needs fixed sized chunks */
176     size_t source_skip;
177
178     pa_sink *sink;
179     pa_bool_t sink_auto_desc;
180     pa_sink_input *sink_input;
181     pa_memblockq *sink_memblockq;
182     int64_t send_counter;          /* updated in sink IO thread */
183     int64_t recv_counter;
184     size_t sink_skip;
185
186     pa_atomic_t request_resync;
187
188     int active_mask;
189     pa_time_event *time_event;
190     pa_usec_t adjust_time;
191
192     FILE *captured_file;
193     FILE *played_file;
194     FILE *canceled_file;
195 };
196
197 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot);
198
199 static const char* const valid_modargs[] = {
200     "source_name",
201     "source_properties",
202     "source_master",
203     "sink_name",
204     "sink_properties",
205     "sink_master",
206     "adjust_time",
207     "format",
208     "rate",
209     "channels",
210     "channel_map",
211     "aec_method",
212     "aec_args",
213     "save_aec",
214     "autoloaded",
215     NULL
216 };
217
218 enum {
219     SOURCE_OUTPUT_MESSAGE_POST = PA_SOURCE_OUTPUT_MESSAGE_MAX,
220     SOURCE_OUTPUT_MESSAGE_REWIND,
221     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT,
222     SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME
223 };
224
225 enum {
226     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT
227 };
228
229 static int64_t calc_diff(struct userdata *u, struct snapshot *snapshot) {
230     int64_t buffer, diff_time, buffer_latency;
231
232     /* get the number of samples between capture and playback */
233     if (snapshot->plen > snapshot->rlen)
234         buffer = snapshot->plen - snapshot->rlen;
235     else
236         buffer = 0;
237
238     buffer += snapshot->source_delay + snapshot->sink_delay;
239
240     /* add the amount of samples not yet transferred to the source context */
241     if (snapshot->recv_counter <= snapshot->send_counter)
242         buffer += (int64_t) (snapshot->send_counter - snapshot->recv_counter);
243     else
244         buffer += PA_CLIP_SUB(buffer, (int64_t) (snapshot->recv_counter - snapshot->send_counter));
245
246     /* convert to time */
247     buffer_latency = pa_bytes_to_usec(buffer, &u->source_output->sample_spec);
248
249     /* capture and playback samples are perfectly aligned when diff_time is 0 */
250     diff_time = (snapshot->sink_now + snapshot->sink_latency - buffer_latency) -
251           (snapshot->source_now - snapshot->source_latency);
252
253     pa_log_debug("diff %lld (%lld - %lld + %lld) %lld %lld %lld %lld", (long long) diff_time,
254         (long long) snapshot->sink_latency,
255         (long long) buffer_latency, (long long) snapshot->source_latency,
256         (long long) snapshot->source_delay, (long long) snapshot->sink_delay,
257         (long long) (snapshot->send_counter - snapshot->recv_counter),
258         (long long) (snapshot->sink_now - snapshot->source_now));
259
260     return diff_time;
261 }
262
263 /* Called from main context */
264 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
265     struct userdata *u = userdata;
266     uint32_t old_rate, base_rate, new_rate;
267     int64_t diff_time;
268     /*size_t fs*/
269     struct snapshot latency_snapshot;
270
271     pa_assert(u);
272     pa_assert(a);
273     pa_assert(u->time_event == e);
274     pa_assert_ctl_context();
275
276     if (u->active_mask != 3)
277         return;
278
279     /* update our snapshots */
280     pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
281     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
282
283     /* calculate drift between capture and playback */
284     diff_time = calc_diff(u, &latency_snapshot);
285
286     /*fs = pa_frame_size(&u->source_output->sample_spec);*/
287     old_rate = u->sink_input->sample_spec.rate;
288     base_rate = u->source_output->sample_spec.rate;
289
290     if (diff_time < 0) {
291         /* recording before playback, we need to adjust quickly. The echo
292          * canceler does not work in this case. */
293         pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
294             NULL, diff_time, NULL, NULL);
295         /*new_rate = base_rate - ((pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
296         new_rate = base_rate;
297     }
298     else {
299         if (diff_time > 1000) {
300             /* diff too big, quickly adjust */
301             pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME,
302                 NULL, diff_time, NULL, NULL);
303         }
304
305         /* recording behind playback, we need to slowly adjust the rate to match */
306         /*new_rate = base_rate + ((pa_usec_to_bytes(diff_time, &u->source_output->sample_spec) / fs) * PA_USEC_PER_SEC) / u->adjust_time;*/
307
308         /* assume equal samplerates for now */
309         new_rate = base_rate;
310     }
311
312     /* make sure we don't make too big adjustments because that sounds horrible */
313     if (new_rate > base_rate * 1.1 || new_rate < base_rate * 0.9)
314         new_rate = base_rate;
315
316     if (new_rate != old_rate) {
317         pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
318
319         pa_sink_input_set_rate(u->sink_input, new_rate);
320     }
321
322     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
323 }
324
325 /* Called from source I/O thread context */
326 static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
327     struct userdata *u = PA_SOURCE(o)->userdata;
328
329     switch (code) {
330
331         case PA_SOURCE_MESSAGE_GET_LATENCY:
332
333             /* The source is _put() before the source output is, so let's
334              * make sure we don't access it in that time. Also, the
335              * source output is first shut down, the source second. */
336             if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
337                 !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state)) {
338                 *((pa_usec_t*) data) = 0;
339                 return 0;
340             }
341
342             *((pa_usec_t*) data) =
343
344                 /* Get the latency of the master source */
345                 pa_source_get_latency_within_thread(u->source_output->source) +
346                 /* Add the latency internal to our source output on top */
347                 pa_bytes_to_usec(pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq), &u->source_output->source->sample_spec) +
348                 /* and the buffering we do on the source */
349                 pa_bytes_to_usec(u->blocksize, &u->source_output->source->sample_spec);
350
351             return 0;
352
353     }
354
355     return pa_source_process_msg(o, code, data, offset, chunk);
356 }
357
358 /* Called from sink I/O thread context */
359 static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
360     struct userdata *u = PA_SINK(o)->userdata;
361
362     switch (code) {
363
364         case PA_SINK_MESSAGE_GET_LATENCY:
365
366             /* The sink is _put() before the sink input is, so let's
367              * make sure we don't access it in that time. Also, the
368              * sink input is first shut down, the sink second. */
369             if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
370                 !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state)) {
371                 *((pa_usec_t*) data) = 0;
372                 return 0;
373             }
374
375             *((pa_usec_t*) data) =
376
377                 /* Get the latency of the master sink */
378                 pa_sink_get_latency_within_thread(u->sink_input->sink) +
379
380                 /* Add the latency internal to our sink input on top */
381                 pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec);
382
383             return 0;
384     }
385
386     return pa_sink_process_msg(o, code, data, offset, chunk);
387 }
388
389
390 /* Called from main context */
391 static int source_set_state_cb(pa_source *s, pa_source_state_t state) {
392     struct userdata *u;
393
394     pa_source_assert_ref(s);
395     pa_assert_se(u = s->userdata);
396
397     if (!PA_SOURCE_IS_LINKED(state) ||
398         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
399         return 0;
400
401     pa_log_debug("Source state %d %d", state, u->active_mask);
402
403     if (state == PA_SOURCE_RUNNING) {
404         /* restart timer when both sink and source are active */
405         u->active_mask |= 1;
406         if (u->active_mask == 3)
407             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
408
409         pa_atomic_store(&u->request_resync, 1);
410         pa_source_output_cork(u->source_output, FALSE);
411     } else if (state == PA_SOURCE_SUSPENDED) {
412         u->active_mask &= ~1;
413         pa_source_output_cork(u->source_output, TRUE);
414     }
415     return 0;
416 }
417
418 /* Called from main context */
419 static int sink_set_state_cb(pa_sink *s, pa_sink_state_t state) {
420     struct userdata *u;
421
422     pa_sink_assert_ref(s);
423     pa_assert_se(u = s->userdata);
424
425     if (!PA_SINK_IS_LINKED(state) ||
426         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
427         return 0;
428
429     pa_log_debug("Sink state %d %d", state, u->active_mask);
430
431     if (state == PA_SINK_RUNNING) {
432         /* restart timer when both sink and source are active */
433         u->active_mask |= 2;
434         if (u->active_mask == 3)
435             pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
436
437         pa_atomic_store(&u->request_resync, 1);
438         pa_sink_input_cork(u->sink_input, FALSE);
439     } else if (state == PA_SINK_SUSPENDED) {
440         u->active_mask &= ~2;
441         pa_sink_input_cork(u->sink_input, TRUE);
442     }
443     return 0;
444 }
445
446 /* Called from I/O thread context */
447 static void source_update_requested_latency_cb(pa_source *s) {
448     struct userdata *u;
449
450     pa_source_assert_ref(s);
451     pa_assert_se(u = s->userdata);
452
453     if (!PA_SOURCE_IS_LINKED(u->source->thread_info.state) ||
454         !PA_SOURCE_OUTPUT_IS_LINKED(u->source_output->thread_info.state))
455         return;
456
457     pa_log_debug("Source update requested latency");
458
459     /* Just hand this one over to the master source */
460     pa_source_output_set_requested_latency_within_thread(
461             u->source_output,
462             pa_source_get_requested_latency_within_thread(s));
463 }
464
465 /* Called from I/O thread context */
466 static void sink_update_requested_latency_cb(pa_sink *s) {
467     struct userdata *u;
468
469     pa_sink_assert_ref(s);
470     pa_assert_se(u = s->userdata);
471
472     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
473         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
474         return;
475
476     pa_log_debug("Sink update requested latency");
477
478     /* Just hand this one over to the master sink */
479     pa_sink_input_set_requested_latency_within_thread(
480             u->sink_input,
481             pa_sink_get_requested_latency_within_thread(s));
482 }
483
484 /* Called from I/O thread context */
485 static void sink_request_rewind_cb(pa_sink *s) {
486     struct userdata *u;
487
488     pa_sink_assert_ref(s);
489     pa_assert_se(u = s->userdata);
490
491     if (!PA_SINK_IS_LINKED(u->sink->thread_info.state) ||
492         !PA_SINK_INPUT_IS_LINKED(u->sink_input->thread_info.state))
493         return;
494
495     pa_log_debug("Sink request rewind %lld", (long long) s->thread_info.rewind_nbytes);
496
497     /* Just hand this one over to the master sink */
498     pa_sink_input_request_rewind(u->sink_input,
499                                  s->thread_info.rewind_nbytes, TRUE, FALSE, FALSE);
500 }
501
502 /* Called from main context */
503 static void source_set_volume_cb(pa_source *s) {
504     struct userdata *u;
505
506     pa_source_assert_ref(s);
507     pa_assert_se(u = s->userdata);
508
509     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
510         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
511         return;
512
513     pa_source_output_set_volume(u->source_output, &s->real_volume, s->save_volume, TRUE);
514 }
515
516 /* Called from main context */
517 static void sink_set_volume_cb(pa_sink *s) {
518     struct userdata *u;
519
520     pa_sink_assert_ref(s);
521     pa_assert_se(u = s->userdata);
522
523     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
524         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
525         return;
526
527     pa_sink_input_set_volume(u->sink_input, &s->real_volume, s->save_volume, TRUE);
528 }
529
530 static void source_get_volume_cb(pa_source *s) {
531     struct userdata *u;
532     pa_cvolume v;
533
534     pa_source_assert_ref(s);
535     pa_assert_se(u = s->userdata);
536
537     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
538         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
539         return;
540
541     pa_source_output_get_volume(u->source_output, &v, TRUE);
542
543     if (pa_cvolume_equal(&s->real_volume, &v))
544         /* no change */
545         return;
546
547     s->real_volume = v;
548     pa_source_set_soft_volume(s, NULL);
549 }
550
551 /* Called from main context */
552 static void source_set_mute_cb(pa_source *s) {
553     struct userdata *u;
554
555     pa_source_assert_ref(s);
556     pa_assert_se(u = s->userdata);
557
558     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
559         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
560         return;
561
562     pa_source_output_set_mute(u->source_output, s->muted, s->save_muted);
563 }
564
565 /* Called from main context */
566 static void sink_set_mute_cb(pa_sink *s) {
567     struct userdata *u;
568
569     pa_sink_assert_ref(s);
570     pa_assert_se(u = s->userdata);
571
572     if (!PA_SINK_IS_LINKED(pa_sink_get_state(s)) ||
573         !PA_SINK_INPUT_IS_LINKED(pa_sink_input_get_state(u->sink_input)))
574         return;
575
576     pa_sink_input_set_mute(u->sink_input, s->muted, s->save_muted);
577 }
578
579 /* Called from main context */
580 static void source_get_mute_cb(pa_source *s) {
581     struct userdata *u;
582
583     pa_source_assert_ref(s);
584     pa_assert_se(u = s->userdata);
585
586     if (!PA_SOURCE_IS_LINKED(pa_source_get_state(s)) ||
587         !PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output)))
588         return;
589
590     pa_source_output_get_mute(u->source_output);
591 }
592
593 /* must be called from the input thread context */
594 static void apply_diff_time(struct userdata *u, int64_t diff_time) {
595     int64_t diff;
596
597     if (diff_time < 0) {
598         diff = pa_usec_to_bytes(-diff_time, &u->source_output->sample_spec);
599
600         if (diff > 0) {
601             /* add some extra safety samples to compensate for jitter in the
602              * timings */
603             diff += 10 * pa_frame_size (&u->source_output->sample_spec);
604
605             pa_log("Playback after capture (%lld), drop sink %lld", (long long) diff_time, (long long) diff);
606
607             u->sink_skip = diff;
608             u->source_skip = 0;
609         }
610     } else if (diff_time > 0) {
611         diff = pa_usec_to_bytes(diff_time, &u->source_output->sample_spec);
612
613         if (diff > 0) {
614             pa_log("playback too far ahead (%lld), drop source %lld", (long long) diff_time, (long long) diff);
615
616             u->source_skip = diff;
617             u->sink_skip = 0;
618         }
619     }
620 }
621
622 /* must be called from the input thread */
623 static void do_resync(struct userdata *u) {
624     int64_t diff_time;
625     struct snapshot latency_snapshot;
626
627     pa_log("Doing resync");
628
629     /* update our snapshot */
630     source_output_snapshot_within_thread(u, &latency_snapshot);
631     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, &latency_snapshot, 0, NULL);
632
633     /* calculate drift between capture and playback */
634     diff_time = calc_diff(u, &latency_snapshot);
635
636     /* and adjust for the drift */
637     apply_diff_time(u, diff_time);
638 }
639
640 /* Called from input thread context */
641 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
642     struct userdata *u;
643     size_t rlen, plen;
644
645     pa_source_output_assert_ref(o);
646     pa_source_output_assert_io_context(o);
647     pa_assert_se(u = o->userdata);
648
649     if (!PA_SOURCE_OUTPUT_IS_LINKED(pa_source_output_get_state(u->source_output))) {
650         pa_log("push when no link?");
651         return;
652     }
653
654     /* handle queued messages */
655     u->in_push = TRUE;
656     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
657         ;
658     u->in_push = FALSE;
659
660     if (pa_atomic_cmpxchg (&u->request_resync, 1, 0)) {
661         do_resync(u);
662     }
663
664     pa_memblockq_push_align(u->source_memblockq, chunk);
665
666     rlen = pa_memblockq_get_length(u->source_memblockq);
667     plen = pa_memblockq_get_length(u->sink_memblockq);
668
669     while (rlen >= u->blocksize) {
670         pa_memchunk rchunk, pchunk;
671
672         /* take fixed block from recorded samples */
673         pa_memblockq_peek_fixed_size(u->source_memblockq, u->blocksize, &rchunk);
674
675         if (plen > u->blocksize && u->source_skip == 0) {
676             uint8_t *rdata, *pdata, *cdata;
677             pa_memchunk cchunk;
678
679             if (u->sink_skip) {
680                 size_t to_skip;
681
682                 if (u->sink_skip > plen)
683                     to_skip = plen;
684                 else
685                     to_skip = u->sink_skip;
686
687                 pa_memblockq_drop(u->sink_memblockq, to_skip);
688                 plen -= to_skip;
689
690                 u->sink_skip -= to_skip;
691             }
692
693             if (plen > u->blocksize && u->sink_skip == 0) {
694                 /* take fixed block from played samples */
695                 pa_memblockq_peek_fixed_size(u->sink_memblockq, u->blocksize, &pchunk);
696
697                 rdata = pa_memblock_acquire(rchunk.memblock);
698                 rdata += rchunk.index;
699                 pdata = pa_memblock_acquire(pchunk.memblock);
700                 pdata += pchunk.index;
701
702                 cchunk.index = 0;
703                 cchunk.length = u->blocksize;
704                 cchunk.memblock = pa_memblock_new(u->source->core->mempool, cchunk.length);
705                 cdata = pa_memblock_acquire(cchunk.memblock);
706
707                 if (u->save_aec) {
708                     if (u->captured_file)
709                         fwrite(rdata, 1, u->blocksize, u->captured_file);
710                     if (u->played_file)
711                         fwrite(pdata, 1, u->blocksize, u->played_file);
712                 }
713
714                 /* perform echo cancellation */
715                 u->ec->run(u->ec, rdata, pdata, cdata);
716
717                 if (u->save_aec) {
718                     if (u->canceled_file)
719                         fwrite(cdata, 1, u->blocksize, u->canceled_file);
720                 }
721
722                 pa_memblock_release(cchunk.memblock);
723                 pa_memblock_release(pchunk.memblock);
724                 pa_memblock_release(rchunk.memblock);
725
726                 /* drop consumed sink samples */
727                 pa_memblockq_drop(u->sink_memblockq, u->blocksize);
728                 pa_memblock_unref(pchunk.memblock);
729
730                 pa_memblock_unref(rchunk.memblock);
731                 /* the filtered samples now become the samples from our
732                  * source */
733                 rchunk = cchunk;
734
735                 plen -= u->blocksize;
736             }
737         }
738
739         /* forward the (echo-canceled) data to the virtual source */
740         pa_source_post(u->source, &rchunk);
741         pa_memblock_unref(rchunk.memblock);
742
743         pa_memblockq_drop(u->source_memblockq, u->blocksize);
744         rlen -= u->blocksize;
745
746         if (u->source_skip) {
747             if (u->source_skip > u->blocksize) {
748                 u->source_skip -= u->blocksize;
749             }
750             else {
751                 u->sink_skip += (u->blocksize - u->source_skip);
752                 u->source_skip = 0;
753             }
754         }
755     }
756 }
757
758 /* Called from I/O thread context */
759 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
760     struct userdata *u;
761
762     pa_sink_input_assert_ref(i);
763     pa_assert(chunk);
764     pa_assert_se(u = i->userdata);
765
766     if (u->sink->thread_info.rewind_requested)
767         pa_sink_process_rewind(u->sink, 0);
768
769     pa_sink_render_full(u->sink, nbytes, chunk);
770
771     if (i->thread_info.underrun_for > 0) {
772         pa_log_debug("Handling end of underrun.");
773         pa_atomic_store(&u->request_resync, 1);
774     }
775
776     /* let source thread handle the chunk. pass the sample count as well so that
777      * the source IO thread can update the right variables. */
778     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_POST,
779         NULL, 0, chunk, NULL);
780     u->send_counter += chunk->length;
781
782     return 0;
783 }
784
785 /* Called from input thread context */
786 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
787     struct userdata *u;
788
789     pa_source_output_assert_ref(o);
790     pa_source_output_assert_io_context(o);
791     pa_assert_se(u = o->userdata);
792
793     pa_source_process_rewind(u->source, nbytes);
794
795     /* go back on read side, we need to use older sink data for this */
796     pa_memblockq_rewind(u->sink_memblockq, nbytes);
797
798     /* manipulate write index */
799     pa_memblockq_seek(u->source_memblockq, -nbytes, PA_SEEK_RELATIVE, TRUE);
800
801     pa_log_debug("Source rewind (%lld) %lld", (long long) nbytes,
802         (long long) pa_memblockq_get_length (u->source_memblockq));
803 }
804
805 /* Called from I/O thread context */
806 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
807     struct userdata *u;
808
809     pa_sink_input_assert_ref(i);
810     pa_assert_se(u = i->userdata);
811
812     pa_log_debug("Sink process rewind %lld", (long long) nbytes);
813
814     pa_sink_process_rewind(u->sink, nbytes);
815
816     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
817     u->send_counter -= nbytes;
818 }
819
820 static void source_output_snapshot_within_thread(struct userdata *u, struct snapshot *snapshot) {
821     size_t delay, rlen, plen;
822     pa_usec_t now, latency;
823
824     now = pa_rtclock_now();
825     latency = pa_source_get_latency_within_thread(u->source_output->source);
826     delay = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
827
828     delay = (u->source_output->thread_info.resampler ? pa_resampler_request(u->source_output->thread_info.resampler, delay) : delay);
829     rlen = pa_memblockq_get_length(u->source_memblockq);
830     plen = pa_memblockq_get_length(u->sink_memblockq);
831
832     snapshot->source_now = now;
833     snapshot->source_latency = latency;
834     snapshot->source_delay = delay;
835     snapshot->recv_counter = u->recv_counter;
836     snapshot->rlen = rlen + u->sink_skip;
837     snapshot->plen = plen + u->source_skip;
838 }
839
840
841 /* Called from output thread context */
842 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
843     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
844
845     switch (code) {
846
847         case SOURCE_OUTPUT_MESSAGE_POST:
848
849             pa_source_output_assert_io_context(u->source_output);
850
851             if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
852                 pa_memblockq_push_align(u->sink_memblockq, chunk);
853             else
854                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
855
856             u->recv_counter += (int64_t) chunk->length;
857
858             return 0;
859
860         case SOURCE_OUTPUT_MESSAGE_REWIND:
861             pa_source_output_assert_io_context(u->source_output);
862
863             /* manipulate write index, never go past what we have */
864             if (PA_SOURCE_IS_OPENED(u->source_output->source->thread_info.state))
865                 pa_memblockq_seek(u->sink_memblockq, -offset, PA_SEEK_RELATIVE, TRUE);
866             else
867                 pa_memblockq_flush_write(u->sink_memblockq, TRUE);
868
869             pa_log_debug("Sink rewind (%lld)", (long long) offset);
870
871             u->recv_counter -= offset;
872
873             return 0;
874
875         case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
876             struct snapshot *snapshot = (struct snapshot *) data;
877
878             source_output_snapshot_within_thread(u, snapshot);
879             return 0;
880         }
881
882         case SOURCE_OUTPUT_MESSAGE_APPLY_DIFF_TIME:
883             apply_diff_time(u, offset);
884             return 0;
885
886     }
887
888     return pa_source_output_process_msg(obj, code, data, offset, chunk);
889 }
890
891 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
892     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
893
894     switch (code) {
895
896         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
897             size_t delay;
898             pa_usec_t now, latency;
899             struct snapshot *snapshot = (struct snapshot *) data;
900
901             pa_sink_input_assert_io_context(u->sink_input);
902
903             now = pa_rtclock_now();
904             latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
905             delay = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
906
907             delay = (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, delay) : delay);
908
909             snapshot->sink_now = now;
910             snapshot->sink_latency = latency;
911             snapshot->sink_delay = delay;
912             snapshot->send_counter = u->send_counter;
913             return 0;
914         }
915     }
916
917     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
918 }
919
920 /* Called from I/O thread context */
921 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
922     struct userdata *u;
923
924     pa_sink_input_assert_ref(i);
925     pa_assert_se(u = i->userdata);
926
927     pa_log_debug("Sink input update max rewind %lld", (long long) nbytes);
928
929     pa_memblockq_set_maxrewind(u->sink_memblockq, nbytes);
930     pa_sink_set_max_rewind_within_thread(u->sink, nbytes);
931 }
932
933 /* Called from I/O thread context */
934 static void source_output_update_max_rewind_cb(pa_source_output *o, size_t nbytes) {
935     struct userdata *u;
936
937     pa_source_output_assert_ref(o);
938     pa_assert_se(u = o->userdata);
939
940     pa_log_debug("Source output update max rewind %lld", (long long) nbytes);
941
942     pa_source_set_max_rewind_within_thread(u->source, nbytes);
943 }
944
945 /* Called from I/O thread context */
946 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
947     struct userdata *u;
948
949     pa_sink_input_assert_ref(i);
950     pa_assert_se(u = i->userdata);
951
952     pa_log_debug("Sink input update max request %lld", (long long) nbytes);
953
954     pa_sink_set_max_request_within_thread(u->sink, nbytes);
955 }
956
957 /* Called from I/O thread context */
958 static void sink_input_update_sink_requested_latency_cb(pa_sink_input *i) {
959     struct userdata *u;
960     pa_usec_t latency;
961
962     pa_sink_input_assert_ref(i);
963     pa_assert_se(u = i->userdata);
964
965     latency = pa_sink_get_requested_latency_within_thread(i->sink);
966
967     pa_log_debug("Sink input update requested latency %lld", (long long) latency);
968 }
969
970 /* Called from I/O thread context */
971 static void source_output_update_source_requested_latency_cb(pa_source_output *o) {
972     struct userdata *u;
973     pa_usec_t latency;
974
975     pa_source_output_assert_ref(o);
976     pa_assert_se(u = o->userdata);
977
978     latency = pa_source_get_requested_latency_within_thread(o->source);
979
980     pa_log_debug("source output update requested latency %lld", (long long) latency);
981 }
982
983 /* Called from I/O thread context */
984 static void sink_input_update_sink_latency_range_cb(pa_sink_input *i) {
985     struct userdata *u;
986
987     pa_sink_input_assert_ref(i);
988     pa_assert_se(u = i->userdata);
989
990     pa_log_debug("Sink input update latency range %lld %lld",
991         (long long) i->sink->thread_info.min_latency,
992         (long long) i->sink->thread_info.max_latency);
993
994     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
995 }
996
997 /* Called from I/O thread context */
998 static void source_output_update_source_latency_range_cb(pa_source_output *o) {
999     struct userdata *u;
1000
1001     pa_source_output_assert_ref(o);
1002     pa_assert_se(u = o->userdata);
1003
1004     pa_log_debug("Source output update latency range %lld %lld",
1005         (long long) o->source->thread_info.min_latency,
1006         (long long) o->source->thread_info.max_latency);
1007
1008     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1009 }
1010
1011 /* Called from I/O thread context */
1012 static void sink_input_update_sink_fixed_latency_cb(pa_sink_input *i) {
1013     struct userdata *u;
1014
1015     pa_sink_input_assert_ref(i);
1016     pa_assert_se(u = i->userdata);
1017
1018     pa_log_debug("Sink input update fixed latency %lld",
1019         (long long) i->sink->thread_info.fixed_latency);
1020
1021     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1022 }
1023
1024 /* Called from I/O thread context */
1025 static void source_output_update_source_fixed_latency_cb(pa_source_output *o) {
1026     struct userdata *u;
1027
1028     pa_source_output_assert_ref(o);
1029     pa_assert_se(u = o->userdata);
1030
1031     pa_log_debug("Source output update fixed latency %lld",
1032         (long long) o->source->thread_info.fixed_latency);
1033
1034     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1035 }
1036
1037 /* Called from output thread context */
1038 static void source_output_attach_cb(pa_source_output *o) {
1039     struct userdata *u;
1040
1041     pa_source_output_assert_ref(o);
1042     pa_source_output_assert_io_context(o);
1043     pa_assert_se(u = o->userdata);
1044
1045     pa_source_set_rtpoll(u->source, o->source->thread_info.rtpoll);
1046     pa_source_set_latency_range_within_thread(u->source, o->source->thread_info.min_latency, o->source->thread_info.max_latency);
1047     pa_source_set_fixed_latency_within_thread(u->source, o->source->thread_info.fixed_latency);
1048     pa_source_set_max_rewind_within_thread(u->source, pa_source_output_get_max_rewind(o));
1049
1050     pa_log_debug("Source output %d attach", o->index);
1051
1052     pa_source_attach_within_thread(u->source);
1053
1054     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
1055             o->source->thread_info.rtpoll,
1056             PA_RTPOLL_LATE,
1057             u->asyncmsgq);
1058 }
1059
1060 /* Called from I/O thread context */
1061 static void sink_input_attach_cb(pa_sink_input *i) {
1062     struct userdata *u;
1063
1064     pa_sink_input_assert_ref(i);
1065     pa_assert_se(u = i->userdata);
1066
1067     pa_sink_set_rtpoll(u->sink, i->sink->thread_info.rtpoll);
1068     pa_sink_set_latency_range_within_thread(u->sink, i->sink->thread_info.min_latency, i->sink->thread_info.max_latency);
1069
1070     /* (8.1) IF YOU NEED A FIXED BLOCK SIZE ADD THE LATENCY FOR ONE
1071      * BLOCK MINUS ONE SAMPLE HERE. SEE (7) */
1072     pa_sink_set_fixed_latency_within_thread(u->sink, i->sink->thread_info.fixed_latency);
1073
1074     /* (8.2) IF YOU NEED A FIXED BLOCK SIZE ROUND
1075      * pa_sink_input_get_max_request(i) UP TO MULTIPLES OF IT
1076      * HERE. SEE (6) */
1077     pa_sink_set_max_request_within_thread(u->sink, pa_sink_input_get_max_request(i));
1078     pa_sink_set_max_rewind_within_thread(u->sink, pa_sink_input_get_max_rewind(i));
1079
1080     pa_log_debug("Sink input %d attach", i->index);
1081
1082     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
1083             i->sink->thread_info.rtpoll,
1084             PA_RTPOLL_LATE,
1085             u->asyncmsgq);
1086
1087     pa_sink_attach_within_thread(u->sink);
1088 }
1089
1090
1091 /* Called from output thread context */
1092 static void source_output_detach_cb(pa_source_output *o) {
1093     struct userdata *u;
1094
1095     pa_source_output_assert_ref(o);
1096     pa_source_output_assert_io_context(o);
1097     pa_assert_se(u = o->userdata);
1098
1099     pa_source_detach_within_thread(u->source);
1100     pa_source_set_rtpoll(u->source, NULL);
1101
1102     pa_log_debug("Source output %d detach", o->index);
1103
1104     if (u->rtpoll_item_read) {
1105         pa_rtpoll_item_free(u->rtpoll_item_read);
1106         u->rtpoll_item_read = NULL;
1107     }
1108 }
1109
1110 /* Called from I/O thread context */
1111 static void sink_input_detach_cb(pa_sink_input *i) {
1112     struct userdata *u;
1113
1114     pa_sink_input_assert_ref(i);
1115     pa_assert_se(u = i->userdata);
1116
1117     pa_sink_detach_within_thread(u->sink);
1118
1119     pa_sink_set_rtpoll(u->sink, NULL);
1120
1121     pa_log_debug("Sink input %d detach", i->index);
1122
1123     if (u->rtpoll_item_write) {
1124         pa_rtpoll_item_free(u->rtpoll_item_write);
1125         u->rtpoll_item_write = NULL;
1126     }
1127 }
1128
1129 /* Called from output thread context */
1130 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
1131     struct userdata *u;
1132
1133     pa_source_output_assert_ref(o);
1134     pa_source_output_assert_io_context(o);
1135     pa_assert_se(u = o->userdata);
1136
1137     pa_log_debug("Source output %d state %d", o->index, state);
1138 }
1139
1140 /* Called from IO thread context */
1141 static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
1142     struct userdata *u;
1143
1144     pa_sink_input_assert_ref(i);
1145     pa_assert_se(u = i->userdata);
1146
1147     pa_log_debug("Sink input %d state %d", i->index, state);
1148
1149     /* If we are added for the first time, ask for a rewinding so that
1150      * we are heard right-away. */
1151     if (PA_SINK_INPUT_IS_LINKED(state) &&
1152         i->thread_info.state == PA_SINK_INPUT_INIT) {
1153         pa_log_debug("Requesting rewind due to state change.");
1154         pa_sink_input_request_rewind(i, 0, FALSE, TRUE, TRUE);
1155     }
1156 }
1157
1158 /* Called from main thread */
1159 static void source_output_kill_cb(pa_source_output *o) {
1160     struct userdata *u;
1161
1162     pa_source_output_assert_ref(o);
1163     pa_assert_ctl_context();
1164     pa_assert_se(u = o->userdata);
1165
1166     u->dead = TRUE;
1167
1168     /* The order here matters! We first kill the source output, followed
1169      * by the source. That means the source callbacks must be protected
1170      * against an unconnected source output! */
1171     pa_source_output_unlink(u->source_output);
1172     pa_source_unlink(u->source);
1173
1174     pa_source_output_unref(u->source_output);
1175     u->source_output = NULL;
1176
1177     pa_source_unref(u->source);
1178     u->source = NULL;
1179
1180     pa_log_debug("Source output kill %d", o->index);
1181
1182     pa_module_unload_request(u->module, TRUE);
1183 }
1184
1185 /* Called from main context */
1186 static void sink_input_kill_cb(pa_sink_input *i) {
1187     struct userdata *u;
1188
1189     pa_sink_input_assert_ref(i);
1190     pa_assert_se(u = i->userdata);
1191
1192     u->dead = TRUE;
1193
1194     /* The order here matters! We first kill the sink input, followed
1195      * by the sink. That means the sink callbacks must be protected
1196      * against an unconnected sink input! */
1197     pa_sink_input_unlink(u->sink_input);
1198     pa_sink_unlink(u->sink);
1199
1200     pa_sink_input_unref(u->sink_input);
1201     u->sink_input = NULL;
1202
1203     pa_sink_unref(u->sink);
1204     u->sink = NULL;
1205
1206     pa_log_debug("Sink input kill %d", i->index);
1207
1208     pa_module_unload_request(u->module, TRUE);
1209 }
1210
1211 /* Called from main thread */
1212 static pa_bool_t source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
1213     struct userdata *u;
1214
1215     pa_source_output_assert_ref(o);
1216     pa_assert_ctl_context();
1217     pa_assert_se(u = o->userdata);
1218
1219     if (u->dead)
1220         return FALSE;
1221
1222     return (u->source != dest) && (u->sink != dest->monitor_of);
1223 }
1224
1225 /* Called from main context */
1226 static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
1227     struct userdata *u;
1228
1229     pa_sink_input_assert_ref(i);
1230     pa_assert_se(u = i->userdata);
1231
1232     if (u->dead)
1233         return FALSE;
1234
1235     return u->sink != dest;
1236 }
1237
1238 /* Called from main thread */
1239 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
1240     struct userdata *u;
1241
1242     pa_source_output_assert_ref(o);
1243     pa_assert_ctl_context();
1244     pa_assert_se(u = o->userdata);
1245
1246     if (dest) {
1247         pa_source_set_asyncmsgq(u->source, dest->asyncmsgq);
1248         pa_source_update_flags(u->source, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
1249     } else
1250         pa_source_set_asyncmsgq(u->source, NULL);
1251
1252     if (u->source_auto_desc && dest) {
1253         const char *z;
1254         pa_proplist *pl;
1255
1256         pl = pa_proplist_new();
1257         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1258         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "Echo-Cancel Source %s on %s",
1259                          pa_proplist_gets(u->source->proplist, "device.echo-cancel.name"), z ? z : dest->name);
1260
1261         pa_source_update_proplist(u->source, PA_UPDATE_REPLACE, pl);
1262         pa_proplist_free(pl);
1263     }
1264 }
1265
1266 /* Called from main context */
1267 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
1268     struct userdata *u;
1269
1270     pa_sink_input_assert_ref(i);
1271     pa_assert_se(u = i->userdata);
1272
1273     if (dest) {
1274         pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
1275         pa_sink_update_flags(u->sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, dest->flags);
1276     } else
1277         pa_sink_set_asyncmsgq(u->sink, NULL);
1278
1279     if (u->sink_auto_desc && dest) {
1280         const char *z;
1281         pa_proplist *pl;
1282
1283         pl = pa_proplist_new();
1284         z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
1285         pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "Echo-Cancel Sink %s on %s",
1286                          pa_proplist_gets(u->sink->proplist, "device.echo-cancel.name"), z ? z : dest->name);
1287
1288         pa_sink_update_proplist(u->sink, PA_UPDATE_REPLACE, pl);
1289         pa_proplist_free(pl);
1290     }
1291 }
1292
1293 /* Called from main context */
1294 static void sink_input_volume_changed_cb(pa_sink_input *i) {
1295     struct userdata *u;
1296
1297     pa_sink_input_assert_ref(i);
1298     pa_assert_se(u = i->userdata);
1299
1300     pa_sink_volume_changed(u->sink, &i->volume);
1301 }
1302
1303 /* Called from main context */
1304 static void sink_input_mute_changed_cb(pa_sink_input *i) {
1305     struct userdata *u;
1306
1307     pa_sink_input_assert_ref(i);
1308     pa_assert_se(u = i->userdata);
1309
1310     pa_sink_mute_changed(u->sink, i->muted);
1311 }
1312
1313 static pa_echo_canceller_method_t get_ec_method_from_string(const char *method) {
1314     if (pa_streq(method, "speex"))
1315         return PA_ECHO_CANCELLER_SPEEX;
1316     else if (pa_streq(method, "adrian"))
1317         return PA_ECHO_CANCELLER_ADRIAN;
1318     else
1319         return PA_ECHO_CANCELLER_INVALID;
1320 }
1321
1322 int pa__init(pa_module*m) {
1323     struct userdata *u;
1324     pa_sample_spec source_ss, sink_ss;
1325     pa_channel_map source_map, sink_map;
1326     pa_modargs *ma;
1327     pa_source *source_master=NULL;
1328     pa_sink *sink_master=NULL;
1329     pa_source_output_new_data source_output_data;
1330     pa_sink_input_new_data sink_input_data;
1331     pa_source_new_data source_data;
1332     pa_sink_new_data sink_data;
1333     pa_memchunk silence;
1334     pa_echo_canceller_method_t ec_method;
1335     uint32_t adjust_time_sec;
1336
1337     pa_assert(m);
1338
1339     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
1340         pa_log("Failed to parse module arguments.");
1341         goto fail;
1342     }
1343
1344     if (!(source_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source_master", NULL), PA_NAMEREG_SOURCE))) {
1345         pa_log("Master source not found");
1346         goto fail;
1347     }
1348     pa_assert(source_master);
1349
1350     if (!(sink_master = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink_master", NULL), PA_NAMEREG_SINK))) {
1351         pa_log("Master sink not found");
1352         goto fail;
1353     }
1354     pa_assert(sink_master);
1355
1356     source_ss = source_master->sample_spec;
1357     source_ss.rate = DEFAULT_RATE;
1358     source_ss.channels = DEFAULT_CHANNELS;
1359     pa_channel_map_init_auto(&source_map, source_ss.channels, PA_CHANNEL_MAP_DEFAULT);
1360     if (pa_modargs_get_sample_spec_and_channel_map(ma, &source_ss, &source_map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1361         pa_log("Invalid sample format specification or channel map");
1362         goto fail;
1363     }
1364
1365     sink_ss = sink_master->sample_spec;
1366     sink_map = sink_master->channel_map;
1367
1368     u = pa_xnew0(struct userdata, 1);
1369     if (!u) {
1370         pa_log("Failed to alloc userdata");
1371         goto fail;
1372     }
1373     u->core = m->core;
1374     u->module = m;
1375     m->userdata = u;
1376     u->dead = FALSE;
1377
1378     u->ec = pa_xnew0(pa_echo_canceller, 1);
1379     if (!u->ec) {
1380         pa_log("Failed to alloc echo canceller");
1381         goto fail;
1382     }
1383
1384     if ((ec_method = get_ec_method_from_string(pa_modargs_get_value(ma, "aec_method", DEFAULT_ECHO_CANCELLER))) < 0) {
1385         pa_log("Invalid echo canceller implementation");
1386         goto fail;
1387     }
1388
1389     u->ec->init = ec_table[ec_method].init;
1390     u->ec->run = ec_table[ec_method].run;
1391     u->ec->done = ec_table[ec_method].done;
1392
1393     adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
1394     if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) {
1395         pa_log("Failed to parse adjust_time value");
1396         goto fail;
1397     }
1398
1399     if (adjust_time_sec != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
1400         u->adjust_time = adjust_time_sec * PA_USEC_PER_SEC;
1401     else
1402         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
1403
1404     u->save_aec = DEFAULT_SAVE_AEC;
1405     if (pa_modargs_get_value_boolean(ma, "save_aec", &u->save_aec) < 0) {
1406         pa_log("Failed to parse save_aec value");
1407         goto fail;
1408     }
1409
1410     u->autoloaded = DEFAULT_AUTOLOADED;
1411     if (pa_modargs_get_value_boolean(ma, "autoloaded", &u->autoloaded) < 0) {
1412         pa_log("Failed to parse autoloaded value");
1413         goto fail;
1414     }
1415
1416     u->asyncmsgq = pa_asyncmsgq_new(0);
1417     u->need_realign = TRUE;
1418     if (u->ec->init) {
1419         if (!u->ec->init(u->core, u->ec, &source_ss, &source_map, &sink_ss, &sink_map, &u->blocksize, pa_modargs_get_value(ma, "aec_args", NULL))) {
1420             pa_log("Failed to init AEC engine");
1421             goto fail;
1422         }
1423     }
1424
1425     /* Create source */
1426     pa_source_new_data_init(&source_data);
1427     source_data.driver = __FILE__;
1428     source_data.module = m;
1429     if (!(source_data.name = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
1430         source_data.name = pa_sprintf_malloc("%s.echo-cancel", source_master->name);
1431     pa_source_new_data_set_sample_spec(&source_data, &source_ss);
1432     pa_source_new_data_set_channel_map(&source_data, &source_map);
1433     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, source_master->name);
1434     pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1435     if (!u->autoloaded)
1436         pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1437     pa_proplist_sets(source_data.proplist, "device.echo-cancel.name", source_data.name);
1438
1439     if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
1440         pa_log("Invalid properties");
1441         pa_source_new_data_done(&source_data);
1442         goto fail;
1443     }
1444
1445     if ((u->source_auto_desc = !pa_proplist_contains(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1446         const char *z;
1447
1448         z = pa_proplist_gets(source_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1449         pa_proplist_setf(source_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Echo-Cancel Source %s on %s", source_data.name, z ? z : source_master->name);
1450     }
1451
1452     u->source = pa_source_new(m->core, &source_data,
1453                           (source_master->flags & (PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY)));
1454     pa_source_new_data_done(&source_data);
1455
1456     if (!u->source) {
1457         pa_log("Failed to create source.");
1458         goto fail;
1459     }
1460
1461     u->source->parent.process_msg = source_process_msg_cb;
1462     u->source->set_state = source_set_state_cb;
1463     u->source->update_requested_latency = source_update_requested_latency_cb;
1464     pa_source_enable_decibel_volume(u->source, TRUE);
1465     pa_source_set_get_volume_callback(u->source, source_get_volume_cb);
1466     pa_source_set_set_volume_callback(u->source, source_set_volume_cb);
1467     pa_source_set_get_mute_callback(u->source, source_get_mute_cb);
1468     pa_source_set_set_mute_callback(u->source, source_set_mute_cb);
1469     u->source->userdata = u;
1470
1471     pa_source_set_asyncmsgq(u->source, source_master->asyncmsgq);
1472
1473     /* Create sink */
1474     pa_sink_new_data_init(&sink_data);
1475     sink_data.driver = __FILE__;
1476     sink_data.module = m;
1477     if (!(sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
1478         sink_data.name = pa_sprintf_malloc("%s.echo-cancel", sink_master->name);
1479     pa_sink_new_data_set_sample_spec(&sink_data, &sink_ss);
1480     pa_sink_new_data_set_channel_map(&sink_data, &sink_map);
1481     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, sink_master->name);
1482     pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "filter");
1483     if (!u->autoloaded)
1484         pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_INTENDED_ROLES, "phone");
1485     pa_proplist_sets(sink_data.proplist, "device.echo-cancel.name", sink_data.name);
1486
1487     if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
1488         pa_log("Invalid properties");
1489         pa_sink_new_data_done(&sink_data);
1490         goto fail;
1491     }
1492
1493     if ((u->sink_auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
1494         const char *z;
1495
1496         z = pa_proplist_gets(sink_master->proplist, PA_PROP_DEVICE_DESCRIPTION);
1497         pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Echo-Cancel Sink %s on %s", sink_data.name, z ? z : sink_master->name);
1498     }
1499
1500     u->sink = pa_sink_new(m->core, &sink_data,
1501                           (sink_master->flags & (PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY)));
1502     pa_sink_new_data_done(&sink_data);
1503
1504     if (!u->sink) {
1505         pa_log("Failed to create sink.");
1506         goto fail;
1507     }
1508
1509     u->sink->parent.process_msg = sink_process_msg_cb;
1510     u->sink->set_state = sink_set_state_cb;
1511     u->sink->update_requested_latency = sink_update_requested_latency_cb;
1512     u->sink->request_rewind = sink_request_rewind_cb;
1513     pa_sink_enable_decibel_volume(u->sink, TRUE);
1514     pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb);
1515     pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb);
1516     u->sink->userdata = u;
1517
1518     pa_sink_set_asyncmsgq(u->sink, sink_master->asyncmsgq);
1519
1520     /* Create source output */
1521     pa_source_output_new_data_init(&source_output_data);
1522     source_output_data.driver = __FILE__;
1523     source_output_data.module = m;
1524     pa_source_output_new_data_set_source(&source_output_data, source_master, FALSE);
1525     source_output_data.destination_source = u->source;
1526     /* FIXME
1527        source_output_data.flags = PA_SOURCE_OUTPUT_DONT_INHIBIT_AUTO_SUSPEND; */
1528
1529     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Source Stream");
1530     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1531     pa_source_output_new_data_set_sample_spec(&source_output_data, &source_ss);
1532     pa_source_output_new_data_set_channel_map(&source_output_data, &source_map);
1533
1534     pa_source_output_new(&u->source_output, m->core, &source_output_data);
1535     pa_source_output_new_data_done(&source_output_data);
1536
1537     if (!u->source_output)
1538         goto fail;
1539
1540     u->source_output->parent.process_msg = source_output_process_msg_cb;
1541     u->source_output->push = source_output_push_cb;
1542     u->source_output->process_rewind = source_output_process_rewind_cb;
1543     u->source_output->update_max_rewind = source_output_update_max_rewind_cb;
1544     u->source_output->update_source_requested_latency = source_output_update_source_requested_latency_cb;
1545     u->source_output->update_source_latency_range = source_output_update_source_latency_range_cb;
1546     u->source_output->update_source_fixed_latency = source_output_update_source_fixed_latency_cb;
1547     u->source_output->kill = source_output_kill_cb;
1548     u->source_output->attach = source_output_attach_cb;
1549     u->source_output->detach = source_output_detach_cb;
1550     u->source_output->state_change = source_output_state_change_cb;
1551     u->source_output->may_move_to = source_output_may_move_to_cb;
1552     u->source_output->moving = source_output_moving_cb;
1553     u->source_output->userdata = u;
1554
1555     u->source->output_from_master = u->source_output;
1556
1557     /* Create sink input */
1558     pa_sink_input_new_data_init(&sink_input_data);
1559     sink_input_data.driver = __FILE__;
1560     sink_input_data.module = m;
1561     pa_sink_input_new_data_set_sink(&sink_input_data, sink_master, FALSE);
1562     sink_input_data.origin_sink = u->sink;
1563     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Echo-Cancel Sink Stream");
1564     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "filter");
1565     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &sink_ss);
1566     pa_sink_input_new_data_set_channel_map(&sink_input_data, &sink_map);
1567     sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
1568
1569     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
1570     pa_sink_input_new_data_done(&sink_input_data);
1571
1572     if (!u->sink_input)
1573         goto fail;
1574
1575     u->sink_input->parent.process_msg = sink_input_process_msg_cb;
1576     u->sink_input->pop = sink_input_pop_cb;
1577     u->sink_input->process_rewind = sink_input_process_rewind_cb;
1578     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
1579     u->sink_input->update_max_request = sink_input_update_max_request_cb;
1580     u->sink_input->update_sink_requested_latency = sink_input_update_sink_requested_latency_cb;
1581     u->sink_input->update_sink_latency_range = sink_input_update_sink_latency_range_cb;
1582     u->sink_input->update_sink_fixed_latency = sink_input_update_sink_fixed_latency_cb;
1583     u->sink_input->kill = sink_input_kill_cb;
1584     u->sink_input->attach = sink_input_attach_cb;
1585     u->sink_input->detach = sink_input_detach_cb;
1586     u->sink_input->state_change = sink_input_state_change_cb;
1587     u->sink_input->may_move_to = sink_input_may_move_to_cb;
1588     u->sink_input->moving = sink_input_moving_cb;
1589     u->sink_input->volume_changed = sink_input_volume_changed_cb;
1590     u->sink_input->mute_changed = sink_input_mute_changed_cb;
1591     u->sink_input->userdata = u;
1592
1593     u->sink->input_to_master = u->sink_input;
1594
1595     pa_sink_input_get_silence(u->sink_input, &silence);
1596
1597     u->source_memblockq = pa_memblockq_new(0, MEMBLOCKQ_MAXLENGTH, 0,
1598         pa_frame_size(&source_ss), 1, 1, 0, &silence);
1599     u->sink_memblockq = pa_memblockq_new(0, MEMBLOCKQ_MAXLENGTH, 0,
1600         pa_frame_size(&sink_ss), 1, 1, 0, &silence);
1601
1602     pa_memblock_unref(silence.memblock);
1603
1604     if (!u->source_memblockq || !u->sink_memblockq) {
1605         pa_log("Failed to create memblockq.");
1606         goto fail;
1607     }
1608
1609     /* our source and sink are not suspended when we create them */
1610     u->active_mask = 3;
1611
1612     if (u->adjust_time > 0)
1613         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
1614
1615     if (u->save_aec) {
1616         pa_log("Creating AEC files in /tmp");
1617         u->captured_file = fopen("/tmp/aec_rec.sw", "wb");
1618         if (u->captured_file == NULL)
1619             perror ("fopen failed");
1620         u->played_file = fopen("/tmp/aec_play.sw", "wb");
1621         if (u->played_file == NULL)
1622             perror ("fopen failed");
1623         u->canceled_file = fopen("/tmp/aec_out.sw", "wb");
1624         if (u->canceled_file == NULL)
1625             perror ("fopen failed");
1626     }
1627
1628     pa_sink_put(u->sink);
1629     pa_source_put(u->source);
1630
1631     pa_sink_input_put(u->sink_input);
1632     pa_source_output_put(u->source_output);
1633
1634     pa_modargs_free(ma);
1635
1636     return 0;
1637
1638 fail:
1639     if (ma)
1640         pa_modargs_free(ma);
1641
1642     pa__done(m);
1643
1644     return -1;
1645 }
1646
1647 int pa__get_n_used(pa_module *m) {
1648     struct userdata *u;
1649
1650     pa_assert(m);
1651     pa_assert_se(u = m->userdata);
1652
1653     return pa_sink_linked_by(u->sink) + pa_source_linked_by(u->source);
1654 }
1655
1656 void pa__done(pa_module*m) {
1657     struct userdata *u;
1658
1659     pa_assert(m);
1660
1661     if (!(u = m->userdata))
1662         return;
1663
1664     u->dead = TRUE;
1665
1666     /* See comments in source_output_kill_cb() above regarding
1667      * destruction order! */
1668
1669     if (u->time_event)
1670         u->core->mainloop->time_free(u->time_event);
1671
1672     if (u->source_output)
1673         pa_source_output_unlink(u->source_output);
1674     if (u->sink_input)
1675         pa_sink_input_unlink(u->sink_input);
1676
1677     if (u->source)
1678         pa_source_unlink(u->source);
1679     if (u->sink)
1680         pa_sink_unlink(u->sink);
1681
1682     if (u->source_output)
1683         pa_source_output_unref(u->source_output);
1684     if (u->sink_input)
1685         pa_sink_input_unref(u->sink_input);
1686
1687     if (u->source)
1688         pa_source_unref(u->source);
1689     if (u->sink)
1690         pa_sink_unref(u->sink);
1691
1692     if (u->source_memblockq)
1693         pa_memblockq_free(u->source_memblockq);
1694     if (u->sink_memblockq)
1695         pa_memblockq_free(u->sink_memblockq);
1696
1697     if (u->ec) {
1698         if (u->ec->done)
1699             u->ec->done(u->ec);
1700
1701         pa_xfree(u->ec);
1702     }
1703
1704     if (u->asyncmsgq)
1705         pa_asyncmsgq_unref(u->asyncmsgq);
1706
1707     pa_xfree(u);
1708 }