Git init
[framework/multimedia/pulseaudio.git] / src / modules / module-loopback.c
1 /***
2     This file is part of PulseAudio.
3
4     Copyright 2009 Intel Corporation
5     Contributor: Pierre-Louis Bossart <pierre-louis.bossart@intel.com>
6
7     PulseAudio is free software; you can redistribute it and/or modify
8     it under the terms of the GNU Lesser General Public License as published
9     by the Free Software Foundation; either version 2.1 of the License,
10     or (at your option) any later version.
11
12     PulseAudio is distributed in the hope that it will be useful, but
13     WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15     General Public License for more details.
16
17     You should have received a copy of the GNU Lesser General Public License
18     along with PulseAudio; if not, write to the Free Software
19     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
20     USA.
21 ***/
22
23 #ifdef HAVE_CONFIG_H
24 #include <config.h>
25 #endif
26
27 #include <stdio.h>
28 #include <math.h>
29
30 #include <pulse/xmalloc.h>
31
32 #include <pulsecore/sink-input.h>
33 #include <pulsecore/module.h>
34 #include <pulsecore/modargs.h>
35 #include <pulsecore/namereg.h>
36 #include <pulsecore/log.h>
37 #include <pulsecore/core-util.h>
38
39 #include <pulse/rtclock.h>
40 #include <pulse/timeval.h>
41
42 #include "module-loopback-symdef.h"
43
44 PA_MODULE_AUTHOR("Pierre-Louis Bossart");
45 PA_MODULE_DESCRIPTION("Loopback from source to sink");
46 PA_MODULE_VERSION(PACKAGE_VERSION);
47 PA_MODULE_LOAD_ONCE(FALSE);
48 PA_MODULE_USAGE(
49         "source=<source to connect to> "
50         "sink=<sink to connect to> "
51         "adjust_time=<how often to readjust rates in s> "
52         "latency_msec=<latency in ms> "
53         "format=<sample format> "
54         "rate=<sample rate> "
55         "channels=<number of channels> "
56         "channel_map=<channel map>");
57
58 #define DEFAULT_LATENCY_MSEC 200
59
60 #define MEMBLOCKQ_MAXLENGTH (1024*1024*16)
61
62 #define DEFAULT_ADJUST_TIME_USEC (10*PA_USEC_PER_SEC)
63
64 struct userdata {
65     pa_core *core;
66     pa_module *module;
67
68     pa_sink_input *sink_input;
69     pa_source_output *source_output;
70
71     pa_asyncmsgq *asyncmsgq;
72     pa_memblockq *memblockq;
73
74     pa_rtpoll_item *rtpoll_item_read, *rtpoll_item_write;
75
76     pa_time_event *time_event;
77     pa_usec_t adjust_time;
78
79     int64_t recv_counter;
80     int64_t send_counter;
81
82     size_t skip;
83     pa_usec_t latency;
84
85     pa_bool_t in_pop;
86     size_t min_memblockq_length;
87
88     struct {
89         int64_t send_counter;
90         size_t source_output_buffer;
91         pa_usec_t source_latency;
92
93         int64_t recv_counter;
94         size_t sink_input_buffer;
95         pa_usec_t sink_latency;
96
97         size_t min_memblockq_length;
98         size_t max_request;
99     } latency_snapshot;
100 };
101
102 static const char* const valid_modargs[] = {
103     "source",
104     "sink",
105     "latency_msec",
106     "format",
107     "rate",
108     "channels",
109     "channel_map",
110     NULL,
111 };
112
113 enum {
114     SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX,
115     SINK_INPUT_MESSAGE_REWIND,
116     SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT,
117     SINK_INPUT_MESSAGE_MAX_REQUEST_CHANGED
118 };
119
120 enum {
121     SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT
122 };
123
124 /* Called from main context */
125 static void teardown(struct userdata *u) {
126     pa_assert(u);
127     pa_assert_ctl_context();
128
129     if (u->sink_input)
130         pa_sink_input_unlink(u->sink_input);
131
132     if (u->source_output)
133         pa_source_output_unlink(u->source_output);
134
135     if (u->sink_input) {
136         pa_sink_input_unref(u->sink_input);
137         u->sink_input = NULL;
138     }
139
140     if (u->source_output) {
141         pa_source_output_unref(u->source_output);
142         u->source_output = NULL;
143     }
144 }
145
146 /* Called from main context */
147 static void adjust_rates(struct userdata *u) {
148     size_t buffer, fs;
149     uint32_t old_rate, base_rate, new_rate;
150     pa_usec_t buffer_latency;
151
152     pa_assert(u);
153     pa_assert_ctl_context();
154
155     pa_asyncmsgq_send(u->source_output->source->asyncmsgq, PA_MSGOBJECT(u->source_output), SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
156     pa_asyncmsgq_send(u->sink_input->sink->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL);
157
158     buffer =
159         u->latency_snapshot.sink_input_buffer +
160         u->latency_snapshot.source_output_buffer;
161
162     if (u->latency_snapshot.recv_counter <= u->latency_snapshot.send_counter)
163         buffer += (size_t) (u->latency_snapshot.send_counter - u->latency_snapshot.recv_counter);
164     else
165         buffer += PA_CLIP_SUB(buffer, (size_t) (u->latency_snapshot.recv_counter - u->latency_snapshot.send_counter));
166
167     buffer_latency = pa_bytes_to_usec(buffer, &u->sink_input->sample_spec);
168
169     pa_log_info("Loopback overall latency is %0.2f ms + %0.2f ms + %0.2f ms = %0.2f ms",
170                 (double) u->latency_snapshot.sink_latency / PA_USEC_PER_MSEC,
171                 (double) buffer_latency / PA_USEC_PER_MSEC,
172                 (double) u->latency_snapshot.source_latency / PA_USEC_PER_MSEC,
173                 ((double) u->latency_snapshot.sink_latency + buffer_latency + u->latency_snapshot.source_latency) / PA_USEC_PER_MSEC);
174
175     pa_log_info("Should buffer %zu bytes, buffered at minimum %zu bytes",
176                 u->latency_snapshot.max_request*2,
177                 u->latency_snapshot.min_memblockq_length);
178
179     fs = pa_frame_size(&u->sink_input->sample_spec);
180     old_rate = u->sink_input->sample_spec.rate;
181     base_rate = u->source_output->sample_spec.rate;
182
183     if (u->latency_snapshot.min_memblockq_length < u->latency_snapshot.max_request*2)
184         new_rate = base_rate - (((u->latency_snapshot.max_request*2 - u->latency_snapshot.min_memblockq_length) / fs) *PA_USEC_PER_SEC)/u->adjust_time;
185     else
186         new_rate = base_rate + (((u->latency_snapshot.min_memblockq_length - u->latency_snapshot.max_request*2) / fs) *PA_USEC_PER_SEC)/u->adjust_time;
187
188     pa_log_info("Old rate %lu Hz, new rate %lu Hz", (unsigned long) old_rate, (unsigned long) new_rate);
189
190     pa_sink_input_set_rate(u->sink_input, new_rate);
191
192     pa_core_rttime_restart(u->core, u->time_event, pa_rtclock_now() + u->adjust_time);
193 }
194
195 /* Called from main context */
196 static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
197     struct userdata *u = userdata;
198
199     pa_assert(u);
200     pa_assert(a);
201     pa_assert(u->time_event == e);
202
203     adjust_rates(u);
204 }
205
206 /* Called from input thread context */
207 static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
208     struct userdata *u;
209     pa_memchunk copy;
210
211     pa_source_output_assert_ref(o);
212     pa_source_output_assert_io_context(o);
213     pa_assert_se(u = o->userdata);
214
215     if (u->skip > chunk->length) {
216         u->skip -= chunk->length;
217         return;
218     }
219
220     if (u->skip > 0) {
221         copy = *chunk;
222         copy.index += u->skip;
223         copy.length -= u->skip;
224         u->skip = 0;
225
226         chunk = &copy;
227     }
228
229     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, chunk, NULL);
230     u->send_counter += (int64_t) chunk->length;
231 }
232
233 /* Called from input thread context */
234 static void source_output_process_rewind_cb(pa_source_output *o, size_t nbytes) {
235     struct userdata *u;
236
237     pa_source_output_assert_ref(o);
238     pa_source_output_assert_io_context(o);
239     pa_assert_se(u = o->userdata);
240
241     pa_asyncmsgq_post(u->asyncmsgq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_REWIND, NULL, (int64_t) nbytes, NULL, NULL);
242     u->send_counter -= (int64_t) nbytes;
243 }
244
245 /* Called from output thread context */
246 static int source_output_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
247     struct userdata *u = PA_SOURCE_OUTPUT(obj)->userdata;
248
249     switch (code) {
250
251         case SOURCE_OUTPUT_MESSAGE_LATENCY_SNAPSHOT: {
252             size_t length;
253
254             length = pa_memblockq_get_length(u->source_output->thread_info.delay_memblockq);
255
256             u->latency_snapshot.send_counter = u->send_counter;
257             u->latency_snapshot.source_output_buffer = u->source_output->thread_info.resampler ? pa_resampler_result(u->source_output->thread_info.resampler, length) : length;
258             u->latency_snapshot.source_latency = pa_source_get_latency_within_thread(u->source_output->source);
259
260             return 0;
261         }
262     }
263
264     return pa_source_output_process_msg(obj, code, data, offset, chunk);
265 }
266
267 /* Called from output thread context */
268 static void source_output_attach_cb(pa_source_output *o) {
269     struct userdata *u;
270
271     pa_source_output_assert_ref(o);
272     pa_source_output_assert_io_context(o);
273     pa_assert_se(u = o->userdata);
274
275     u->rtpoll_item_write = pa_rtpoll_item_new_asyncmsgq_write(
276             o->source->thread_info.rtpoll,
277             PA_RTPOLL_LATE,
278             u->asyncmsgq);
279 }
280
281 /* Called from output thread context */
282 static void source_output_detach_cb(pa_source_output *o) {
283     struct userdata *u;
284
285     pa_source_output_assert_ref(o);
286     pa_source_output_assert_io_context(o);
287     pa_assert_se(u = o->userdata);
288
289     if (u->rtpoll_item_write) {
290         pa_rtpoll_item_free(u->rtpoll_item_write);
291         u->rtpoll_item_write = NULL;
292     }
293 }
294
295 /* Called from output thread context */
296 static void source_output_state_change_cb(pa_source_output *o, pa_source_output_state_t state) {
297     struct userdata *u;
298
299     pa_source_output_assert_ref(o);
300     pa_source_output_assert_io_context(o);
301     pa_assert_se(u = o->userdata);
302
303     if (PA_SOURCE_OUTPUT_IS_LINKED(state) && o->thread_info.state == PA_SOURCE_OUTPUT_INIT) {
304
305         u->skip = pa_usec_to_bytes(PA_CLIP_SUB(pa_source_get_latency_within_thread(o->source),
306                                                u->latency),
307                                    &o->sample_spec);
308
309         pa_log_info("Skipping %lu bytes", (unsigned long) u->skip);
310     }
311 }
312
313 /* Called from main thread */
314 static void source_output_kill_cb(pa_source_output *o) {
315     struct userdata *u;
316
317     pa_source_output_assert_ref(o);
318     pa_assert_ctl_context();
319     pa_assert_se(u = o->userdata);
320
321     teardown(u);
322     pa_module_unload_request(u->module, TRUE);
323 }
324
325 /* Called from main thread */
326 static pa_bool_t source_output_may_move_to_cb(pa_source_output *o, pa_source *dest) {
327     struct userdata *u;
328
329     pa_source_output_assert_ref(o);
330     pa_assert_ctl_context();
331     pa_assert_se(u = o->userdata);
332
333     return dest != u->sink_input->sink->monitor_source;
334 }
335
336 /* Called from main thread */
337 static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
338     pa_proplist *p;
339     const char *n;
340     struct userdata *u;
341
342     pa_source_output_assert_ref(o);
343     pa_assert_ctl_context();
344     pa_assert_se(u = o->userdata);
345
346     p = pa_proplist_new();
347     pa_proplist_setf(p, PA_PROP_MEDIA_NAME, "Loopback of %s", pa_strnull(pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION)));
348
349     if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
350         pa_proplist_sets(p, PA_PROP_MEDIA_ICON_NAME, n);
351
352     pa_sink_input_update_proplist(u->sink_input, PA_UPDATE_REPLACE, p);
353     pa_proplist_free(p);
354 }
355
356 /* Called from output thread context */
357 static void update_min_memblockq_length(struct userdata *u) {
358     size_t length;
359
360     pa_assert(u);
361     pa_sink_input_assert_io_context(u->sink_input);
362
363     length = pa_memblockq_get_length(u->memblockq);
364
365     if (u->min_memblockq_length == (size_t) -1 ||
366         length < u->min_memblockq_length)
367         u->min_memblockq_length = length;
368 }
369
370 /* Called from output thread context */
371 static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
372     struct userdata *u;
373
374     pa_sink_input_assert_ref(i);
375     pa_sink_input_assert_io_context(i);
376     pa_assert_se(u = i->userdata);
377     pa_assert(chunk);
378
379     u->in_pop = TRUE;
380     while (pa_asyncmsgq_process_one(u->asyncmsgq) > 0)
381         ;
382     u->in_pop = FALSE;
383
384     if (pa_memblockq_peek(u->memblockq, chunk) < 0) {
385         pa_log_info("Coud not peek into queue");
386         return -1;
387     }
388
389     chunk->length = PA_MIN(chunk->length, nbytes);
390     pa_memblockq_drop(u->memblockq, chunk->length);
391
392     update_min_memblockq_length(u);
393
394     return 0;
395 }
396
397 /* Called from output thread context */
398 static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) {
399     struct userdata *u;
400
401     pa_sink_input_assert_ref(i);
402     pa_sink_input_assert_io_context(i);
403     pa_assert_se(u = i->userdata);
404
405     pa_memblockq_rewind(u->memblockq, nbytes);
406 }
407
408 /* Called from output thread context */
409 static int sink_input_process_msg_cb(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) {
410     struct userdata *u = PA_SINK_INPUT(obj)->userdata;
411
412     switch (code) {
413
414         case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
415              pa_usec_t *r = data;
416
417              pa_sink_input_assert_io_context(u->sink_input);
418
419             *r = pa_bytes_to_usec(pa_memblockq_get_length(u->memblockq), &u->sink_input->sample_spec);
420
421             /* Fall through, the default handler will add in the extra
422              * latency added by the resampler */
423             break;
424         }
425
426         case SINK_INPUT_MESSAGE_POST:
427
428             pa_sink_input_assert_io_context(u->sink_input);
429
430             if (PA_SINK_IS_OPENED(u->sink_input->sink->thread_info.state))
431                 pa_memblockq_push_align(u->memblockq, chunk);
432             else
433                 pa_memblockq_flush_write(u->memblockq, TRUE);
434
435             update_min_memblockq_length(u);
436
437             /* Is this the end of an underrun? Then let's start things
438              * right-away */
439             if (!u->in_pop &&
440                 u->sink_input->thread_info.underrun_for > 0 &&
441                 pa_memblockq_is_readable(u->memblockq)) {
442
443                 pa_log_debug("Requesting rewind due to end of underrun.");
444                 pa_sink_input_request_rewind(u->sink_input,
445                                              (size_t) (u->sink_input->thread_info.underrun_for == (size_t) -1 ? 0 : u->sink_input->thread_info.underrun_for),
446                                              FALSE, TRUE, FALSE);
447             }
448
449             u->recv_counter += (int64_t) chunk->length;
450
451             return 0;
452
453         case SINK_INPUT_MESSAGE_REWIND:
454
455             pa_sink_input_assert_io_context(u->sink_input);
456
457             if (PA_SINK_IS_OPENED(u->sink_input->sink->thread_info.state))
458                 pa_memblockq_seek(u->memblockq, -offset, PA_SEEK_RELATIVE, TRUE);
459             else
460                 pa_memblockq_flush_write(u->memblockq, TRUE);
461
462             u->recv_counter -= offset;
463
464             update_min_memblockq_length(u);
465
466             return 0;
467
468         case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: {
469             size_t length;
470
471             update_min_memblockq_length(u);
472
473             length = pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq);
474
475             u->latency_snapshot.recv_counter = u->recv_counter;
476             u->latency_snapshot.sink_input_buffer =
477                 pa_memblockq_get_length(u->memblockq) +
478                 (u->sink_input->thread_info.resampler ? pa_resampler_request(u->sink_input->thread_info.resampler, length) : length);
479             u->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(u->sink_input->sink);
480
481             u->latency_snapshot.max_request = pa_sink_input_get_max_request(u->sink_input);
482
483             u->latency_snapshot.min_memblockq_length = u->min_memblockq_length;
484             u->min_memblockq_length = (size_t) -1;
485
486             return 0;
487         }
488
489         case SINK_INPUT_MESSAGE_MAX_REQUEST_CHANGED: {
490             /* This message is sent from the IO thread to the main
491              * thread! So don't be confused. All the user cases above
492              * are executed in thread context, but this one is not! */
493
494             pa_assert_ctl_context();
495
496             if (u->adjust_time > 0)
497                 adjust_rates(u);
498             return 0;
499         }
500     }
501
502     return pa_sink_input_process_msg(obj, code, data, offset, chunk);
503 }
504
505 /* Called from output thread context */
506 static void sink_input_attach_cb(pa_sink_input *i) {
507     struct userdata *u;
508
509     pa_sink_input_assert_ref(i);
510     pa_sink_input_assert_io_context(i);
511     pa_assert_se(u = i->userdata);
512
513     u->rtpoll_item_read = pa_rtpoll_item_new_asyncmsgq_read(
514             i->sink->thread_info.rtpoll,
515             PA_RTPOLL_LATE,
516             u->asyncmsgq);
517
518     pa_memblockq_set_prebuf(u->memblockq, pa_sink_input_get_max_request(i)*2);
519     pa_memblockq_set_maxrewind(u->memblockq, pa_sink_input_get_max_rewind(i));
520
521     u->min_memblockq_length = (size_t) -1;
522 }
523
524 /* Called from output thread context */
525 static void sink_input_detach_cb(pa_sink_input *i) {
526     struct userdata *u;
527
528     pa_sink_input_assert_ref(i);
529     pa_sink_input_assert_io_context(i);
530     pa_assert_se(u = i->userdata);
531
532     if (u->rtpoll_item_read) {
533         pa_rtpoll_item_free(u->rtpoll_item_read);
534         u->rtpoll_item_read = NULL;
535     }
536 }
537
538 /* Called from output thread context */
539 static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
540     struct userdata *u;
541
542     pa_sink_input_assert_ref(i);
543     pa_sink_input_assert_io_context(i);
544     pa_assert_se(u = i->userdata);
545
546     pa_memblockq_set_maxrewind(u->memblockq, nbytes);
547 }
548
549 /* Called from output thread context */
550 static void sink_input_update_max_request_cb(pa_sink_input *i, size_t nbytes) {
551     struct userdata *u;
552
553     pa_sink_input_assert_ref(i);
554     pa_sink_input_assert_io_context(i);
555     pa_assert_se(u = i->userdata);
556
557     pa_memblockq_set_prebuf(u->memblockq, nbytes*2);
558     pa_log_info("Max request changed");
559     pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->sink_input), SINK_INPUT_MESSAGE_MAX_REQUEST_CHANGED, NULL, 0, NULL, NULL);
560 }
561
562 /* Called from main thread */
563 static void sink_input_kill_cb(pa_sink_input *i) {
564     struct userdata *u;
565
566     pa_sink_input_assert_ref(i);
567     pa_assert_ctl_context();
568     pa_assert_se(u = i->userdata);
569
570     teardown(u);
571     pa_module_unload_request(u->module, TRUE);
572 }
573
574 /* Called from main thread */
575 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
576     struct userdata *u;
577     pa_proplist *p;
578     const char *n;
579
580     pa_sink_input_assert_ref(i);
581     pa_assert_ctl_context();
582     pa_assert_se(u = i->userdata);
583
584     p = pa_proplist_new();
585     pa_proplist_setf(p, PA_PROP_MEDIA_NAME, "Loopback to %s", pa_strnull(pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION)));
586
587     if ((n = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_ICON_NAME)))
588         pa_proplist_sets(p, PA_PROP_MEDIA_ICON_NAME, n);
589
590     pa_source_output_update_proplist(u->source_output, PA_UPDATE_REPLACE, p);
591     pa_proplist_free(p);
592 }
593
594 /* Called from main thread */
595 static pa_bool_t sink_input_may_move_to_cb(pa_sink_input *i, pa_sink *dest) {
596     struct userdata *u;
597
598     pa_sink_input_assert_ref(i);
599     pa_assert_ctl_context();
600     pa_assert_se(u = i->userdata);
601
602     if (!u->source_output->source->monitor_of)
603         return TRUE;
604
605     return dest != u->source_output->source->monitor_of;
606 }
607
608 int pa__init(pa_module *m) {
609     pa_modargs *ma = NULL;
610     struct userdata *u;
611     pa_sink *sink;
612     pa_sink_input_new_data sink_input_data;
613     pa_source *source;
614     pa_source_output_new_data source_output_data;
615     uint32_t latency_msec;
616     pa_sample_spec ss;
617     pa_channel_map map;
618     pa_memchunk silence;
619     uint32_t adjust_time_sec;
620     const char *n;
621
622     pa_assert(m);
623
624     if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
625         pa_log("Failed to parse module arguments");
626         goto fail;
627     }
628
629     if (!(source = pa_namereg_get(m->core, pa_modargs_get_value(ma, "source", NULL), PA_NAMEREG_SOURCE))) {
630         pa_log("No such source.");
631         goto fail;
632     }
633
634     if (!(sink = pa_namereg_get(m->core, pa_modargs_get_value(ma, "sink", NULL), PA_NAMEREG_SINK))) {
635         pa_log("No such sink.");
636         goto fail;
637     }
638
639     ss = sink->sample_spec;
640     map = sink->channel_map;
641     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
642         pa_log("Invalid sample format specification or channel map");
643         goto fail;
644     }
645
646     latency_msec = DEFAULT_LATENCY_MSEC;
647     if (pa_modargs_get_value_u32(ma, "latency_msec", &latency_msec) < 0 || latency_msec < 1 || latency_msec > 2000) {
648         pa_log("Invalid latency specification");
649         goto fail;
650     }
651
652     m->userdata = u = pa_xnew0(struct userdata, 1);
653     u->core = m->core;
654     u->module = m;
655     u->latency = (pa_usec_t) latency_msec * PA_USEC_PER_MSEC;
656
657     adjust_time_sec = DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC;
658     if (pa_modargs_get_value_u32(ma, "adjust_time", &adjust_time_sec) < 0) {
659         pa_log("Failed to parse adjust_time value");
660         goto fail;
661     }
662
663     if (adjust_time_sec != DEFAULT_ADJUST_TIME_USEC / PA_USEC_PER_SEC)
664         u->adjust_time = adjust_time_sec * PA_USEC_PER_SEC;
665     else
666         u->adjust_time = DEFAULT_ADJUST_TIME_USEC;
667
668     pa_sink_input_new_data_init(&sink_input_data);
669     sink_input_data.driver = __FILE__;
670     sink_input_data.module = m;
671     sink_input_data.sink = sink;
672
673     pa_proplist_setf(sink_input_data.proplist, PA_PROP_MEDIA_NAME, "Loopback of %s",
674                      pa_strnull(pa_proplist_gets(source->proplist, PA_PROP_DEVICE_DESCRIPTION)));
675     if ((n = pa_proplist_gets(source->proplist, PA_PROP_DEVICE_ICON_NAME)))
676         pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ICON_NAME, n);
677     pa_proplist_sets(sink_input_data.proplist, PA_PROP_MEDIA_ROLE, "abstract");
678     pa_sink_input_new_data_set_sample_spec(&sink_input_data, &ss);
679     pa_sink_input_new_data_set_channel_map(&sink_input_data, &map);
680     sink_input_data.flags = PA_SINK_INPUT_VARIABLE_RATE;
681
682     pa_sink_input_new(&u->sink_input, m->core, &sink_input_data);
683     pa_sink_input_new_data_done(&sink_input_data);
684
685     if (!u->sink_input)
686         goto fail;
687
688     u->sink_input->parent.process_msg = sink_input_process_msg_cb;
689     u->sink_input->pop = sink_input_pop_cb;
690     u->sink_input->process_rewind = sink_input_process_rewind_cb;
691     u->sink_input->kill = sink_input_kill_cb;
692     u->sink_input->attach = sink_input_attach_cb;
693     u->sink_input->detach = sink_input_detach_cb;
694     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;
695     u->sink_input->update_max_request = sink_input_update_max_request_cb;
696     u->sink_input->may_move_to = sink_input_may_move_to_cb;
697     u->sink_input->moving = sink_input_moving_cb;
698     u->sink_input->userdata = u;
699
700     pa_sink_input_set_requested_latency(u->sink_input, u->latency/3);
701
702     pa_source_output_new_data_init(&source_output_data);
703     source_output_data.driver = __FILE__;
704     source_output_data.module = m;
705     source_output_data.source = source;
706     pa_proplist_setf(source_output_data.proplist, PA_PROP_MEDIA_NAME, "Loopback to %s",
707                      pa_strnull(pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_DESCRIPTION)));
708     if ((n = pa_proplist_gets(sink->proplist, PA_PROP_DEVICE_ICON_NAME)))
709         pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ICON_NAME, n);
710     pa_proplist_sets(source_output_data.proplist, PA_PROP_MEDIA_ROLE, "abstract");
711     pa_source_output_new_data_set_sample_spec(&source_output_data, &ss);
712     pa_sink_input_new_data_set_channel_map(&sink_input_data, &map);
713
714     pa_source_output_new(&u->source_output, m->core, &source_output_data);
715     pa_source_output_new_data_done(&source_output_data);
716
717     if (!u->source_output)
718         goto fail;
719
720     u->source_output->parent.process_msg = source_output_process_msg_cb;
721     u->source_output->push = source_output_push_cb;
722     u->source_output->process_rewind = source_output_process_rewind_cb;
723     u->source_output->kill = source_output_kill_cb;
724     u->source_output->attach = source_output_attach_cb;
725     u->source_output->detach = source_output_detach_cb;
726     u->source_output->state_change = source_output_state_change_cb;
727     u->source_output->may_move_to = source_output_may_move_to_cb;
728     u->source_output->moving = source_output_moving_cb;
729     u->source_output->userdata = u;
730
731     pa_source_output_set_requested_latency(u->source_output, u->latency/3);
732
733     pa_sink_input_get_silence(u->sink_input, &silence);
734     u->memblockq = pa_memblockq_new(
735             0,                      /* idx */
736             MEMBLOCKQ_MAXLENGTH,    /* maxlength */
737             MEMBLOCKQ_MAXLENGTH,    /* tlength */
738             pa_frame_size(&ss),     /* base */
739             0,                      /* prebuf */
740             0,                      /* minreq */
741             0,                      /* maxrewind */
742             &silence);              /* silence frame */
743     pa_memblock_unref(silence.memblock);
744
745     u->asyncmsgq = pa_asyncmsgq_new(0);
746
747     pa_sink_input_put(u->sink_input);
748     pa_source_output_put(u->source_output);
749
750     if (u->adjust_time > 0)
751         u->time_event = pa_core_rttime_new(m->core, pa_rtclock_now() + u->adjust_time, time_callback, u);
752
753     pa_modargs_free(ma);
754     return 0;
755
756 fail:
757     if (ma)
758         pa_modargs_free(ma);
759
760     pa__done(m);
761
762     return -1;
763 }
764
765 void pa__done(pa_module*m) {
766     struct userdata *u;
767
768     pa_assert(m);
769
770     if (!(u = m->userdata))
771         return;
772
773     teardown(u);
774
775     if (u->memblockq)
776         pa_memblockq_free(u->memblockq);
777
778     if (u->asyncmsgq)
779         pa_asyncmsgq_unref(u->asyncmsgq);
780
781     if (u->time_event)
782         u->core->mainloop->time_free(u->time_event);
783
784     pa_xfree(u);
785 }