pulsesink: rework context sharing
authorStefan Kost <ensonic@users.sf.net>
Mon, 13 Sep 2010 13:24:26 +0000 (16:24 +0300)
committerStefan Kost <ensonic@users.sf.net>
Mon, 13 Sep 2010 13:59:22 +0000 (16:59 +0300)
We also need to share the main-loop threads as this owns the context. Thus have
a class wide main-loop thread. From this we create a context per client-name.
Instead of always looking up the context, we keep this with the instance. The
reverse mapping is only needed in pulse singal handlers. This saves a lot of
locking. Also one signal handler becomes simpler as ther eis only one mainloop
to notify.

Now valgind happy - no leaks, no bad reads/writes.

This reverts major parts of commit 69a397c32f4baf07a7b2937c610f9e8f383e9ae9.

Fixes #628996

ext/pulse/pulsesink.c
ext/pulse/pulsesink.h

index d22dc72..093b91d 100644 (file)
@@ -115,6 +115,8 @@ struct _GstPulseContext
  * (strings) and values should be GstPulseContext pointers. */
 static GHashTable *gst_pulse_shared_contexts = NULL;
 
+static GMutex *pa_shared_ressource_mutex = NULL;
+
 /* We keep a custom ringbuffer that is backed up by data allocated by
  * pulseaudio. We must also overide the commit function to write into
  * pulseaudio memory instead. */
@@ -125,6 +127,7 @@ struct _GstPulseRingBuffer
   gchar *context_name;
   gchar *stream_name;
 
+  pa_context *context;
   pa_stream *stream;
 
   pa_sample_spec sample_spec;
@@ -166,13 +169,13 @@ static guint gst_pulseringbuffer_commit (GstRingBuffer * buf,
 
 G_DEFINE_TYPE (GstPulseRingBuffer, gst_pulseringbuffer, GST_TYPE_RING_BUFFER);
 
-static GMutex *pa_ring_buffer_mutex = NULL;
 static void
-gst_pulseringbuffer_init_contexts (void)
+gst_pulsesink_init_contexts (void)
 {
-  g_assert (pa_ring_buffer_mutex == NULL);
-  pa_ring_buffer_mutex = g_mutex_new ();
-  gst_pulse_shared_contexts = g_hash_table_new (g_str_hash, g_str_equal);
+  g_assert (pa_shared_ressource_mutex == NULL);
+  pa_shared_ressource_mutex = g_mutex_new ();
+  gst_pulse_shared_contexts = g_hash_table_new_full (g_str_hash, g_str_equal,
+      g_free, NULL);
 }
 
 static void
@@ -210,6 +213,7 @@ static void
 gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf)
 {
   pbuf->stream_name = NULL;
+  pbuf->context = NULL;
   pbuf->stream = NULL;
 
 #ifdef HAVE_PULSE_0_9_13
@@ -268,38 +272,35 @@ gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf)
   pbuf->stream_name = NULL;
 }
 
-static GstPulseContext *
-gst_pulsering_get_context (GstPulseRingBuffer * pbuf)
-{
-  GstPulseContext *pctx;
-
-  g_mutex_lock (pa_ring_buffer_mutex);
-  pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
-  g_mutex_unlock (pa_ring_buffer_mutex);
-  return pctx;
-}
-
 static void
 gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf)
 {
-  GstPulseSink *psink;
+  g_mutex_lock (pa_shared_ressource_mutex);
 
-  g_mutex_lock (pa_ring_buffer_mutex);
-  psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+  GST_DEBUG_OBJECT (pbuf, "destroying ringbuffer %p", pbuf);
+
+  gst_pulsering_destroy_stream (pbuf);
+
+  if (pbuf->context) {
+    pa_context_unref (pbuf->context);
+    pbuf->context = NULL;
+  }
 
   if (pbuf->context_name) {
     GstPulseContext *pctx;
 
-    GST_DEBUG_OBJECT (psink, "destroying context for pbuf=%p '%s'",
-        pbuf, pbuf->context_name);
-
     pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
 
-    gst_pulsering_destroy_stream (pbuf);
+    GST_DEBUG_OBJECT (pbuf, "releasing context with name %s, pbuf=%p, pctx=%p",
+        pbuf->context_name, pbuf, pctx);
 
     if (pctx) {
       pctx->ring_buffers = g_slist_remove (pctx->ring_buffers, pbuf);
       if (pctx->ring_buffers == NULL) {
+        GST_DEBUG_OBJECT (pbuf,
+            "destroying final context with name %s, pbuf=%p, pctx=%p",
+            pbuf->context_name, pbuf, pctx);
+
         pa_context_disconnect (pctx->context);
 
         /* Make sure we don't get any further callbacks */
@@ -309,15 +310,15 @@ gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf)
 #endif
 
         g_hash_table_remove (gst_pulse_shared_contexts, pbuf->context_name);
-        g_free (pbuf->context_name);
-        pbuf->context_name = NULL;
 
         pa_context_unref (pctx->context);
         g_slice_free (GstPulseContext, pctx);
       }
     }
+    g_free (pbuf->context_name);
+    pbuf->context_name = NULL;
   }
-  g_mutex_unlock (pa_ring_buffer_mutex);
+  g_mutex_unlock (pa_shared_ressource_mutex);
 }
 
 static void
@@ -328,26 +329,18 @@ gst_pulseringbuffer_finalize (GObject * object)
   ringbuffer = GST_PULSERING_BUFFER_CAST (object);
 
   gst_pulsering_destroy_context (ringbuffer);
-
   G_OBJECT_CLASS (ring_parent_class)->finalize (object);
 }
 
 static gboolean
 gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf)
 {
-  GstPulseContext *pctx = gst_pulsering_get_context (pbuf);
-
-  if (!pctx) {
-    GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected"), (NULL));
-    return TRUE;
-  }
-
-  if (!pctx->context
-      || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pctx->context))
+  if (!pbuf->context
+      || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pbuf->context))
       || !pbuf->stream
       || !PA_STREAM_IS_GOOD (pa_stream_get_state (pbuf->stream))) {
     const gchar *err_str =
-        pctx->context ? pa_strerror (pa_context_errno (pctx->context)) : NULL;
+        pbuf->context ? pa_strerror (pa_context_errno (pbuf->context)) : NULL;
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s",
             err_str), (NULL));
     return TRUE;
@@ -358,38 +351,26 @@ gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf)
 static void
 gst_pulsering_context_state_cb (pa_context * c, void *userdata)
 {
-  GstPulseSink *psink;
   pa_context_state_t state;
-
-  GstPulseContext *pctx = (GstPulseContext *) userdata;
-  GSList *walk;
+  pa_threaded_mainloop *mainloop = (pa_threaded_mainloop *) userdata;
 
   state = pa_context_get_state (c);
 
-  for (walk = pctx->ring_buffers; walk; walk = g_slist_next (walk)) {
-    GstPulseRingBuffer *pbuf = (GstPulseRingBuffer *) walk->data;
-    psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
-    GST_LOG_OBJECT (psink, "got new context state %d", state);
+  GST_LOG ("got new context state %d", state);
 
-    /* psink can be null when we are shutting down and the ringbuffer is already
-     * unparented */
-    if (psink == NULL)
-      continue;
-
-    switch (state) {
-      case PA_CONTEXT_READY:
-      case PA_CONTEXT_TERMINATED:
-      case PA_CONTEXT_FAILED:
-        GST_LOG_OBJECT (psink, "signaling");
-        pa_threaded_mainloop_signal (psink->mainloop, 0);
-        break;
+  switch (state) {
+    case PA_CONTEXT_READY:
+    case PA_CONTEXT_TERMINATED:
+    case PA_CONTEXT_FAILED:
+      GST_LOG ("signaling");
+      pa_threaded_mainloop_signal (mainloop, 0);
+      break;
 
-      case PA_CONTEXT_UNCONNECTED:
-      case PA_CONTEXT_CONNECTING:
-      case PA_CONTEXT_AUTHORIZING:
-      case PA_CONTEXT_SETTING_NAME:
-        break;
-    }
+    case PA_CONTEXT_UNCONNECTED:
+    case PA_CONTEXT_CONNECTING:
+    case PA_CONTEXT_AUTHORIZING:
+    case PA_CONTEXT_SETTING_NAME:
+      break;
   }
 }
 
@@ -443,7 +424,6 @@ gst_pulseringbuffer_open_device (GstRingBuffer * buf)
   pbuf = GST_PULSERING_BUFFER_CAST (buf);
 
   g_assert (!pbuf->stream);
-
   g_assert (psink->client_name);
 
   if (psink->server)
@@ -453,25 +433,25 @@ gst_pulseringbuffer_open_device (GstRingBuffer * buf)
     pbuf->context_name = g_strdup (psink->client_name);
 
   pa_threaded_mainloop_lock (psink->mainloop);
-  g_mutex_lock (pa_ring_buffer_mutex);
+  g_mutex_lock (pa_shared_ressource_mutex);
 
   pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
   if (pctx == NULL) {
     pctx = g_slice_new0 (GstPulseContext);
 
     /* get the mainloop api and create a context */
-    GST_LOG_OBJECT (psink, "new context with name %s",
-        GST_STR_NULL (pbuf->context_name));
+    GST_INFO_OBJECT (psink, "new context with name %s, pbuf=%p, pctx=%p",
+        pbuf->context_name, pbuf, pctx);
     api = pa_threaded_mainloop_get_api (psink->mainloop);
     if (!(pctx->context = pa_context_new (api, pbuf->context_name)))
       goto create_failed;
 
     pctx->ring_buffers = g_slist_prepend (pctx->ring_buffers, pbuf);
-    g_hash_table_insert (gst_pulse_shared_contexts, pbuf->context_name,
-        (gpointer) pctx);
+    g_hash_table_insert (gst_pulse_shared_contexts,
+        g_strdup (pbuf->context_name), (gpointer) pctx);
     /* register some essential callbacks */
     pa_context_set_state_callback (pctx->context,
-        gst_pulsering_context_state_cb, pctx);
+        gst_pulsering_context_state_cb, psink->mainloop);
 #ifdef HAVE_PULSE_0_9_12
     pa_context_set_subscribe_callback (pctx->context,
         gst_pulsering_context_subscribe_cb, pctx);
@@ -485,15 +465,19 @@ gst_pulseringbuffer_open_device (GstRingBuffer * buf)
             PA_CONTEXT_NOAUTOSPAWN, NULL) < 0)
       goto connect_failed;
   } else {
-    GST_LOG_OBJECT (psink, "reusing shared pulseaudio context with name %s",
-        GST_STR_NULL (pbuf->context_name));
+    GST_INFO_OBJECT (psink,
+        "reusing shared context with name %s, pbuf=%p, pctx=%p",
+        pbuf->context_name, pbuf, pctx);
     pctx->ring_buffers = g_slist_prepend (pctx->ring_buffers, pbuf);
   }
 
+  /* context created or shared okay */
+  pbuf->context = pa_context_ref (pctx->context);
+
   for (;;) {
     pa_context_state_t state;
 
-    state = pa_context_get_state (pctx->context);
+    state = pa_context_get_state (pbuf->context);
 
     GST_LOG_OBJECT (psink, "context state is now %d", state);
 
@@ -510,7 +494,7 @@ gst_pulseringbuffer_open_device (GstRingBuffer * buf)
 
   GST_LOG_OBJECT (psink, "opened the device");
 
-  g_mutex_unlock (pa_ring_buffer_mutex);
+  g_mutex_unlock (pa_shared_ressource_mutex);
   pa_threaded_mainloop_unlock (psink->mainloop);
 
   return TRUE;
@@ -518,7 +502,7 @@ gst_pulseringbuffer_open_device (GstRingBuffer * buf)
   /* ERRORS */
 unlock_and_fail:
   {
-    g_mutex_unlock (pa_ring_buffer_mutex);
+    g_mutex_unlock (pa_shared_ressource_mutex);
     gst_pulsering_destroy_context (pbuf);
 
     pa_threaded_mainloop_unlock (psink->mainloop);
@@ -528,6 +512,7 @@ create_failed:
   {
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("Failed to create context"), (NULL));
+    g_slice_free (GstPulseContext, pctx);
     goto unlock_and_fail;
   }
 connect_failed:
@@ -725,7 +710,6 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
 {
   GstPulseSink *psink;
   GstPulseRingBuffer *pbuf;
-  GstPulseContext *pctx;
   pa_buffer_attr wanted;
   const pa_buffer_attr *actual;
   pa_channel_map channel_map;
@@ -749,12 +733,12 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
   pa_threaded_mainloop_lock (psink->mainloop);
 
   /* we need a context and a no stream */
-  pctx = gst_pulsering_get_context (pbuf);
+  g_assert (pbuf->context);
   g_assert (!pbuf->stream);
 
   /* enable event notifications */
   GST_LOG_OBJECT (psink, "subscribing to context events");
-  if (!(o = pa_context_subscribe (pctx->context,
+  if (!(o = pa_context_subscribe (pbuf->context,
               PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL)))
     goto subscribe_failed;
 
@@ -772,10 +756,10 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
   /* create a stream */
   GST_LOG_OBJECT (psink, "creating stream with name %s", name);
   if (psink->proplist) {
-    if (!(pbuf->stream = pa_stream_new_with_proplist (pctx->context,
+    if (!(pbuf->stream = pa_stream_new_with_proplist (pbuf->context,
                 name, &pbuf->sample_spec, &channel_map, psink->proplist)))
       goto stream_failed;
-  } else if (!(pbuf->stream = pa_stream_new (pctx->context,
+  } else if (!(pbuf->stream = pa_stream_new (pbuf->context,
               name, &pbuf->sample_spec, &channel_map)))
     goto stream_failed;
 
@@ -910,21 +894,21 @@ subscribe_failed:
   {
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("pa_context_subscribe() failed: %s",
-            pa_strerror (pa_context_errno (pctx->context))), (NULL));
+            pa_strerror (pa_context_errno (pbuf->context))), (NULL));
     goto unlock_and_fail;
   }
 stream_failed:
   {
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("Failed to create stream: %s",
-            pa_strerror (pa_context_errno (pctx->context))), (NULL));
+            pa_strerror (pa_context_errno (pbuf->context))), (NULL));
     goto unlock_and_fail;
   }
 connect_failed:
   {
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("Failed to connect stream: %s",
-            pa_strerror (pa_context_errno (pctx->context))), (NULL));
+            pa_strerror (pa_context_errno (pbuf->context))), (NULL));
     goto unlock_and_fail;
   }
 }
@@ -966,7 +950,6 @@ gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked,
 {
   pa_operation *o = NULL;
   GstPulseSink *psink;
-  GstPulseContext *pctx = NULL;
   gboolean res = FALSE;
 
   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
@@ -1002,10 +985,9 @@ server_dead:
   }
 cork_failed:
   {
-    pctx = gst_pulsering_get_context (pbuf);
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("pa_stream_cork() failed: %s",
-            pa_strerror (pa_context_errno (pctx->context))), (NULL));
+            pa_strerror (pa_context_errno (pbuf->context))), (NULL));
     goto cleanup;
   }
 }
@@ -1264,7 +1246,6 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
 {
   GstPulseSink *psink;
   GstPulseRingBuffer *pbuf;
-  GstPulseContext *pctx;
   guint result;
   guint8 *data_end;
   gboolean reverse;
@@ -1636,20 +1617,16 @@ was_paused:
   }
 writable_size_failed:
   {
-    pctx = gst_pulsering_get_context (pbuf);
-
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("pa_stream_writable_size() failed: %s",
-            pa_strerror (pa_context_errno (pctx->context))), (NULL));
+            pa_strerror (pa_context_errno (pbuf->context))), (NULL));
     goto unlock_and_fail;
   }
 write_failed:
   {
-    pctx = gst_pulsering_get_context (pbuf);
-
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("pa_stream_write() failed: %s",
-            pa_strerror (pa_context_errno (pctx->context))), (NULL));
+            pa_strerror (pa_context_errno (pbuf->context))), (NULL));
     goto unlock_and_fail;
   }
 }
@@ -1676,7 +1653,7 @@ static void gst_pulsesink_init_interfaces (GType type);
 GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink);
 
 #define _do_init(type) \
-  gst_pulseringbuffer_init_contexts (); \
+  gst_pulsesink_init_contexts (); \
   gst_pulsesink_init_interfaces (type);
 
 GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstBaseAudioSink,
@@ -2007,7 +1984,6 @@ gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume)
   pa_cvolume v;
   pa_operation *o = NULL;
   GstPulseRingBuffer *pbuf;
-  GstPulseContext *pctx;
   uint32_t idx;
 
   if (!psink->mainloop)
@@ -2026,9 +2002,7 @@ gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume)
 
   gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume);
 
-  pctx = gst_pulsering_get_context (pbuf);
-
-  if (!(o = pa_context_set_sink_input_volume (pctx->context, idx,
+  if (!(o = pa_context_set_sink_input_volume (pbuf->context, idx,
               &v, NULL, NULL)))
     goto volume_failed;
 
@@ -2068,7 +2042,7 @@ volume_failed:
   {
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("pa_stream_set_sink_input_volume() failed: %s",
-            pa_strerror (pa_context_errno (pctx->context))), (NULL));
+            pa_strerror (pa_context_errno (pbuf->context))), (NULL));
     goto unlock;
   }
 }
@@ -2078,7 +2052,6 @@ gst_pulsesink_set_mute (GstPulseSink * psink, gboolean mute)
 {
   pa_operation *o = NULL;
   GstPulseRingBuffer *pbuf;
-  GstPulseContext *pctx;
   uint32_t idx;
 
   if (!psink->mainloop)
@@ -2095,9 +2068,7 @@ gst_pulsesink_set_mute (GstPulseSink * psink, gboolean mute)
   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
     goto no_index;
 
-  pctx = gst_pulsering_get_context (pbuf);
-
-  if (!(o = pa_context_set_sink_input_mute (pctx->context, idx,
+  if (!(o = pa_context_set_sink_input_mute (pbuf->context, idx,
               mute, NULL, NULL)))
     goto mute_failed;
 
@@ -2137,7 +2108,7 @@ mute_failed:
   {
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("pa_stream_set_sink_input_mute() failed: %s",
-            pa_strerror (pa_context_errno (pctx->context))), (NULL));
+            pa_strerror (pa_context_errno (pbuf->context))), (NULL));
     goto unlock;
   }
 }
@@ -2174,7 +2145,6 @@ static gdouble
 gst_pulsesink_get_volume (GstPulseSink * psink)
 {
   GstPulseRingBuffer *pbuf;
-  GstPulseContext *pctx;
   pa_operation *o = NULL;
   gdouble v = DEFAULT_VOLUME;
   uint32_t idx;
@@ -2191,9 +2161,7 @@ gst_pulsesink_get_volume (GstPulseSink * psink)
   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
     goto no_index;
 
-  pctx = gst_pulsering_get_context (pbuf);
-
-  if (!(o = pa_context_get_sink_input_info (pctx->context, idx,
+  if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
               gst_pulsesink_sink_input_info_cb, pbuf)))
     goto info_failed;
 
@@ -2239,7 +2207,7 @@ info_failed:
   {
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("pa_context_get_sink_input_info() failed: %s",
-            pa_strerror (pa_context_errno (pctx->context))), (NULL));
+            pa_strerror (pa_context_errno (pbuf->context))), (NULL));
     goto unlock;
   }
 }
@@ -2248,7 +2216,6 @@ static gboolean
 gst_pulsesink_get_mute (GstPulseSink * psink)
 {
   GstPulseRingBuffer *pbuf;
-  GstPulseContext *pctx;
   pa_operation *o = NULL;
   uint32_t idx;
   gboolean mute = FALSE;
@@ -2266,9 +2233,7 @@ gst_pulsesink_get_mute (GstPulseSink * psink)
   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
     goto no_index;
 
-  pctx = gst_pulsering_get_context (pbuf);
-
-  if (!(o = pa_context_get_sink_input_info (pctx->context, idx,
+  if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
               gst_pulsesink_sink_input_info_cb, pbuf)))
     goto info_failed;
 
@@ -2279,7 +2244,6 @@ gst_pulsesink_get_mute (GstPulseSink * psink)
   }
 
 unlock:
-
   if (o)
     pa_operation_unref (o);
 
@@ -2308,7 +2272,7 @@ info_failed:
   {
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("pa_context_get_sink_input_info() failed: %s",
-            pa_strerror (pa_context_errno (pctx->context))), (NULL));
+            pa_strerror (pa_context_errno (pbuf->context))), (NULL));
     goto unlock;
   }
 }
@@ -2343,7 +2307,6 @@ static gchar *
 gst_pulsesink_device_description (GstPulseSink * psink)
 {
   GstPulseRingBuffer *pbuf;
-  GstPulseContext *pctx;
   pa_operation *o = NULL;
   gchar *t;
 
@@ -2355,9 +2318,7 @@ gst_pulsesink_device_description (GstPulseSink * psink)
   if (pbuf == NULL || pbuf->stream == NULL)
     goto no_buffer;
 
-  pctx = gst_pulsering_get_context (pbuf);
-
-  if (!(o = pa_context_get_sink_info_by_index (pctx->context,
+  if (!(o = pa_context_get_sink_info_by_index (pbuf->context,
               pa_stream_get_device_index (pbuf->stream),
               gst_pulsesink_sink_info_cb, pbuf)))
     goto info_failed;
@@ -2369,7 +2330,6 @@ gst_pulsesink_device_description (GstPulseSink * psink)
   }
 
 unlock:
-
   if (o)
     pa_operation_unref (o);
 
@@ -2393,7 +2353,7 @@ info_failed:
   {
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("pa_context_get_sink_info_by_index() failed: %s",
-            pa_strerror (pa_context_errno (pctx->context))), (NULL));
+            pa_strerror (pa_context_errno (pbuf->context))), (NULL));
     goto unlock;
   }
 }
@@ -2489,7 +2449,6 @@ gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t)
 {
   pa_operation *o = NULL;
   GstPulseRingBuffer *pbuf;
-  GstPulseContext *pctx;
 
   pa_threaded_mainloop_lock (psink->mainloop);
 
@@ -2501,8 +2460,6 @@ gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t)
   g_free (pbuf->stream_name);
   pbuf->stream_name = g_strdup (t);
 
-  pctx = gst_pulsering_get_context (pbuf);
-
   if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL)))
     goto name_failed;
 
@@ -2525,7 +2482,7 @@ name_failed:
   {
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("pa_stream_set_name() failed: %s",
-            pa_strerror (pa_context_errno (pctx->context))), (NULL));
+            pa_strerror (pa_context_errno (pbuf->context))), (NULL));
     goto unlock;
   }
 }
@@ -2551,7 +2508,6 @@ gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l)
   gboolean empty = TRUE;
   pa_operation *o = NULL;
   GstPulseRingBuffer *pbuf;
-  GstPulseContext *pctx;
 
   pl = pa_proplist_new ();
 
@@ -2572,13 +2528,10 @@ gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l)
     goto finish;
 
   pa_threaded_mainloop_lock (psink->mainloop);
-
   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
   if (pbuf == NULL || pbuf->stream == NULL)
     goto no_buffer;
 
-  pctx = gst_pulsering_get_context (pbuf);
-
   if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE,
               pl, NULL, NULL)))
     goto update_failed;
@@ -2608,7 +2561,7 @@ update_failed:
   {
     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
         ("pa_stream_proplist_update() failed: %s",
-            pa_strerror (pa_context_errno (pctx->context))), (NULL));
+            pa_strerror (pa_context_errno (pbuf->context))), (NULL));
     goto unlock;
   }
 }
@@ -2672,18 +2625,30 @@ static GstStateChangeReturn
 gst_pulsesink_change_state (GstElement * element, GstStateChange transition)
 {
   GstPulseSink *pulsesink = GST_PULSESINK (element);
+  GstPulseSinkClass *klass = GST_PULSESINK_GET_CLASS (pulsesink);
   GstStateChangeReturn ret;
   guint res;
 
   switch (transition) {
     case GST_STATE_CHANGE_NULL_TO_READY:
-      g_assert (pulsesink->mainloop == NULL);
-      pulsesink->mainloop = pa_threaded_mainloop_new ();
-      if (!pulsesink->mainloop)
-        goto mainloop_failed;
-      res = pa_threaded_mainloop_start (pulsesink->mainloop);
-      g_assert (res == 0);
-
+      g_mutex_lock (pa_shared_ressource_mutex);
+      if (!klass->main_loop_ref_ct) {
+        GST_INFO_OBJECT (element, "new pa main loop thread");
+        klass->mainloop = pa_threaded_mainloop_new ();
+        if (!klass->mainloop)
+          goto mainloop_failed;
+        klass->main_loop_ref_ct = 1;
+        res = pa_threaded_mainloop_start (klass->mainloop);
+        g_assert (res == 0);
+        g_mutex_unlock (pa_shared_ressource_mutex);
+
+        pulsesink->mainloop = klass->mainloop;
+      } else {
+        GST_INFO_OBJECT (element, "reusing pa main loop thread");
+        pulsesink->mainloop = klass->mainloop;
+        klass->main_loop_ref_ct++;
+        g_mutex_unlock (pa_shared_ressource_mutex);
+      }
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       gst_element_post_message (element,
@@ -2706,9 +2671,16 @@ gst_pulsesink_change_state (GstElement * element, GstStateChange transition)
       break;
     case GST_STATE_CHANGE_READY_TO_NULL:
       if (pulsesink->mainloop) {
-        pa_threaded_mainloop_stop (pulsesink->mainloop);
-        pa_threaded_mainloop_free (pulsesink->mainloop);
+        g_mutex_lock (pa_shared_ressource_mutex);
+        klass->main_loop_ref_ct--;
         pulsesink->mainloop = NULL;
+        if (!klass->main_loop_ref_ct) {
+          GST_INFO_OBJECT (element, "terminating pa main loop thread");
+          pa_threaded_mainloop_stop (klass->mainloop);
+          pa_threaded_mainloop_free (klass->mainloop);
+          klass->mainloop = NULL;
+        }
+        g_mutex_unlock (pa_shared_ressource_mutex);
       }
       break;
     default:
@@ -2720,6 +2692,7 @@ gst_pulsesink_change_state (GstElement * element, GstStateChange transition)
   /* ERRORS */
 mainloop_failed:
   {
+    g_mutex_unlock (pa_shared_ressource_mutex);
     GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
         ("pa_threaded_mainloop_new() failed"), (NULL));
     return GST_STATE_CHANGE_FAILURE;
@@ -2729,9 +2702,16 @@ state_failure:
     if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
       /* Clear the PA mainloop if baseaudiosink failed to open the ring_buffer */
       g_assert (pulsesink->mainloop);
-      pa_threaded_mainloop_stop (pulsesink->mainloop);
-      pa_threaded_mainloop_free (pulsesink->mainloop);
+      g_mutex_lock (pa_shared_ressource_mutex);
+      klass->main_loop_ref_ct--;
       pulsesink->mainloop = NULL;
+      if (!klass->main_loop_ref_ct) {
+        GST_INFO_OBJECT (element, "terminating pa main loop thread");
+        pa_threaded_mainloop_stop (klass->mainloop);
+        pa_threaded_mainloop_free (klass->mainloop);
+        klass->mainloop = NULL;
+      }
+      g_mutex_unlock (pa_shared_ressource_mutex);
     }
     return ret;
   }
index 83a0bed..6ddade2 100644 (file)
@@ -46,6 +46,9 @@ G_BEGIN_DECLS
   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSESINK))
 #define GST_PULSESINK_CAST(obj) \
   ((GstPulseSink *)(obj))
+#define GST_PULSESINK_GET_CLASS(obj) \
+  (G_TYPE_INSTANCE_GET_CLASS((obj),GST_TYPE_PULSESINK,GstPulseSinkClass))
+
 
 typedef struct _GstPulseSink GstPulseSink;
 typedef struct _GstPulseSinkClass GstPulseSinkClass;
@@ -79,6 +82,9 @@ struct _GstPulseSink
 struct _GstPulseSinkClass
 {
   GstBaseAudioSinkClass parent_class;
+
+  pa_threaded_mainloop *mainloop;
+  guint main_loop_ref_ct;
 };
 
 GType gst_pulsesink_get_type (void);