loopback: Flush asyncmsgq from the right context
authorTanu Kaskinen <tanuk@iki.fi>
Thu, 14 Mar 2013 20:07:14 +0000 (22:07 +0200)
committerTanu Kaskinen <tanuk@iki.fi>
Fri, 22 Mar 2013 18:49:56 +0000 (20:49 +0200)
u->asyncmsg is accessed from two IO threads. teardown() shouldn't
flush the queue from the main thread while both IO threads are still
potentially using the queue. This patch fixes that error by flushing
the queue from the sink input thread when the sink input is being
unlinked.

Flushing the queue in teardown() caused this assertion in
pa_asyncmsgq_get() to crash sometimes: pa_assert(!a->current)

src/modules/module-loopback.c

index ea24c3b..6017891 100644 (file)
@@ -138,29 +138,35 @@ static void teardown(struct userdata *u) {
     pa_assert(u);
     pa_assert_ctl_context();
 
-    if (u->asyncmsgq)
-        pa_asyncmsgq_flush(u->asyncmsgq, 0);
-
     u->adjust_time = 0;
     enable_adjust_timer(u, false);
 
-    if (u->sink_input)
-        pa_sink_input_unlink(u->sink_input);
+    /* Handling the asyncmsgq between the source output and the sink input
+     * requires some care. When the source output is unlinked, nothing needs
+     * to be done for the asyncmsgq, because the source output is the sending
+     * end. But when the sink input is unlinked, we should ensure that the
+     * asyncmsgq is emptied, because the messages in the queue hold references
+     * to the sink input. Also, we need to ensure that new messages won't be
+     * written to the queue after we have emptied it.
+     *
+     * Emptying the queue can be done in the state_changed() callback of the
+     * sink input, when the new state is "unlinked".
+     *
+     * Preventing new messages from being written to the queue can be achieved
+     * by unlinking the source output before unlinking the sink input. There
+     * are no other writers for that queue, so this is sufficient. */
 
-    if (u->source_output)
+    if (u->source_output) {
         pa_source_output_unlink(u->source_output);
+        pa_source_output_unref(u->source_output);
+        u->source_output = NULL;
+    }
 
     if (u->sink_input) {
-        u->sink_input->parent.process_msg = pa_sink_input_process_msg;
+        pa_sink_input_unlink(u->sink_input);
         pa_sink_input_unref(u->sink_input);
         u->sink_input = NULL;
     }
-
-    if (u->source_output) {
-        u->source_output->parent.process_msg = pa_source_output_process_msg;
-        pa_source_output_unref(u->source_output);
-        u->source_output = NULL;
-    }
 }
 
 /* Called from main context */
@@ -646,6 +652,17 @@ static void sink_input_kill_cb(pa_sink_input *i) {
     pa_module_unload_request(u->module, TRUE);
 }
 
+/* Called from the output thread context */
+static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
+    struct userdata *u;
+
+    pa_sink_input_assert_ref(i);
+    pa_assert_se(u = i->userdata);
+
+    if (state == PA_SINK_INPUT_UNLINKED)
+        pa_asyncmsgq_flush(u->asyncmsgq, false);
+}
+
 /* Called from main thread */
 static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
     struct userdata *u;
@@ -857,6 +874,7 @@ int pa__init(pa_module *m) {
     u->sink_input->pop = sink_input_pop_cb;
     u->sink_input->process_rewind = sink_input_process_rewind_cb;
     u->sink_input->kill = sink_input_kill_cb;
+    u->sink_input->state_change = sink_input_state_change_cb;
     u->sink_input->attach = sink_input_attach_cb;
     u->sink_input->detach = sink_input_detach_cb;
     u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;