* (strings) and values should be GstPulseContext pointers. */
static GHashTable *gst_pulse_shared_contexts = NULL;
+static GMutex *pa_shared_ressource_mutex = NULL;
+
/* We keep a custom ringbuffer that is backed up by data allocated by
* pulseaudio. We must also overide the commit function to write into
* pulseaudio memory instead. */
gchar *context_name;
gchar *stream_name;
+ pa_context *context;
pa_stream *stream;
pa_sample_spec sample_spec;
G_DEFINE_TYPE (GstPulseRingBuffer, gst_pulseringbuffer, GST_TYPE_RING_BUFFER);
-static GMutex *pa_ring_buffer_mutex = NULL;
static void
-gst_pulseringbuffer_init_contexts (void)
+gst_pulsesink_init_contexts (void)
{
- g_assert (pa_ring_buffer_mutex == NULL);
- pa_ring_buffer_mutex = g_mutex_new ();
- gst_pulse_shared_contexts = g_hash_table_new (g_str_hash, g_str_equal);
+ g_assert (pa_shared_ressource_mutex == NULL);
+ pa_shared_ressource_mutex = g_mutex_new ();
+ gst_pulse_shared_contexts = g_hash_table_new_full (g_str_hash, g_str_equal,
+ g_free, NULL);
}
static void
gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf)
{
pbuf->stream_name = NULL;
+ pbuf->context = NULL;
pbuf->stream = NULL;
#ifdef HAVE_PULSE_0_9_13
pbuf->stream_name = NULL;
}
-static GstPulseContext *
-gst_pulsering_get_context (GstPulseRingBuffer * pbuf)
-{
- GstPulseContext *pctx;
-
- g_mutex_lock (pa_ring_buffer_mutex);
- pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
- g_mutex_unlock (pa_ring_buffer_mutex);
- return pctx;
-}
-
static void
gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf)
{
- GstPulseSink *psink;
+ g_mutex_lock (pa_shared_ressource_mutex);
- g_mutex_lock (pa_ring_buffer_mutex);
- psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+ GST_DEBUG_OBJECT (pbuf, "destroying ringbuffer %p", pbuf);
+
+ gst_pulsering_destroy_stream (pbuf);
+
+ if (pbuf->context) {
+ pa_context_unref (pbuf->context);
+ pbuf->context = NULL;
+ }
if (pbuf->context_name) {
GstPulseContext *pctx;
- GST_DEBUG_OBJECT (psink, "destroying context for pbuf=%p '%s'",
- pbuf, pbuf->context_name);
-
pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
- gst_pulsering_destroy_stream (pbuf);
+ GST_DEBUG_OBJECT (pbuf, "releasing context with name %s, pbuf=%p, pctx=%p",
+ pbuf->context_name, pbuf, pctx);
if (pctx) {
pctx->ring_buffers = g_slist_remove (pctx->ring_buffers, pbuf);
if (pctx->ring_buffers == NULL) {
+ GST_DEBUG_OBJECT (pbuf,
+ "destroying final context with name %s, pbuf=%p, pctx=%p",
+ pbuf->context_name, pbuf, pctx);
+
pa_context_disconnect (pctx->context);
/* Make sure we don't get any further callbacks */
#endif
g_hash_table_remove (gst_pulse_shared_contexts, pbuf->context_name);
- g_free (pbuf->context_name);
- pbuf->context_name = NULL;
pa_context_unref (pctx->context);
g_slice_free (GstPulseContext, pctx);
}
}
+ g_free (pbuf->context_name);
+ pbuf->context_name = NULL;
}
- g_mutex_unlock (pa_ring_buffer_mutex);
+ g_mutex_unlock (pa_shared_ressource_mutex);
}
static void
ringbuffer = GST_PULSERING_BUFFER_CAST (object);
gst_pulsering_destroy_context (ringbuffer);
-
G_OBJECT_CLASS (ring_parent_class)->finalize (object);
}
static gboolean
gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf)
{
- GstPulseContext *pctx = gst_pulsering_get_context (pbuf);
-
- if (!pctx) {
- GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected"), (NULL));
- return TRUE;
- }
-
- if (!pctx->context
- || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pctx->context))
+ if (!pbuf->context
+ || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pbuf->context))
|| !pbuf->stream
|| !PA_STREAM_IS_GOOD (pa_stream_get_state (pbuf->stream))) {
const gchar *err_str =
- pctx->context ? pa_strerror (pa_context_errno (pctx->context)) : NULL;
+ pbuf->context ? pa_strerror (pa_context_errno (pbuf->context)) : NULL;
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s",
err_str), (NULL));
return TRUE;
static void
gst_pulsering_context_state_cb (pa_context * c, void *userdata)
{
- GstPulseSink *psink;
pa_context_state_t state;
-
- GstPulseContext *pctx = (GstPulseContext *) userdata;
- GSList *walk;
+ pa_threaded_mainloop *mainloop = (pa_threaded_mainloop *) userdata;
state = pa_context_get_state (c);
- for (walk = pctx->ring_buffers; walk; walk = g_slist_next (walk)) {
- GstPulseRingBuffer *pbuf = (GstPulseRingBuffer *) walk->data;
- psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
- GST_LOG_OBJECT (psink, "got new context state %d", state);
+ GST_LOG ("got new context state %d", state);
- /* psink can be null when we are shutting down and the ringbuffer is already
- * unparented */
- if (psink == NULL)
- continue;
-
- switch (state) {
- case PA_CONTEXT_READY:
- case PA_CONTEXT_TERMINATED:
- case PA_CONTEXT_FAILED:
- GST_LOG_OBJECT (psink, "signaling");
- pa_threaded_mainloop_signal (psink->mainloop, 0);
- break;
+ switch (state) {
+ case PA_CONTEXT_READY:
+ case PA_CONTEXT_TERMINATED:
+ case PA_CONTEXT_FAILED:
+ GST_LOG ("signaling");
+ pa_threaded_mainloop_signal (mainloop, 0);
+ break;
- case PA_CONTEXT_UNCONNECTED:
- case PA_CONTEXT_CONNECTING:
- case PA_CONTEXT_AUTHORIZING:
- case PA_CONTEXT_SETTING_NAME:
- break;
- }
+ case PA_CONTEXT_UNCONNECTED:
+ case PA_CONTEXT_CONNECTING:
+ case PA_CONTEXT_AUTHORIZING:
+ case PA_CONTEXT_SETTING_NAME:
+ break;
}
}
pbuf = GST_PULSERING_BUFFER_CAST (buf);
g_assert (!pbuf->stream);
-
g_assert (psink->client_name);
if (psink->server)
pbuf->context_name = g_strdup (psink->client_name);
pa_threaded_mainloop_lock (psink->mainloop);
- g_mutex_lock (pa_ring_buffer_mutex);
+ g_mutex_lock (pa_shared_ressource_mutex);
pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
if (pctx == NULL) {
pctx = g_slice_new0 (GstPulseContext);
/* get the mainloop api and create a context */
- GST_LOG_OBJECT (psink, "new context with name %s",
- GST_STR_NULL (pbuf->context_name));
+ GST_INFO_OBJECT (psink, "new context with name %s, pbuf=%p, pctx=%p",
+ pbuf->context_name, pbuf, pctx);
api = pa_threaded_mainloop_get_api (psink->mainloop);
if (!(pctx->context = pa_context_new (api, pbuf->context_name)))
goto create_failed;
pctx->ring_buffers = g_slist_prepend (pctx->ring_buffers, pbuf);
- g_hash_table_insert (gst_pulse_shared_contexts, pbuf->context_name,
- (gpointer) pctx);
+ g_hash_table_insert (gst_pulse_shared_contexts,
+ g_strdup (pbuf->context_name), (gpointer) pctx);
/* register some essential callbacks */
pa_context_set_state_callback (pctx->context,
- gst_pulsering_context_state_cb, pctx);
+ gst_pulsering_context_state_cb, psink->mainloop);
#ifdef HAVE_PULSE_0_9_12
pa_context_set_subscribe_callback (pctx->context,
gst_pulsering_context_subscribe_cb, pctx);
PA_CONTEXT_NOAUTOSPAWN, NULL) < 0)
goto connect_failed;
} else {
- GST_LOG_OBJECT (psink, "reusing shared pulseaudio context with name %s",
- GST_STR_NULL (pbuf->context_name));
+ GST_INFO_OBJECT (psink,
+ "reusing shared context with name %s, pbuf=%p, pctx=%p",
+ pbuf->context_name, pbuf, pctx);
pctx->ring_buffers = g_slist_prepend (pctx->ring_buffers, pbuf);
}
+ /* context created or shared okay */
+ pbuf->context = pa_context_ref (pctx->context);
+
for (;;) {
pa_context_state_t state;
- state = pa_context_get_state (pctx->context);
+ state = pa_context_get_state (pbuf->context);
GST_LOG_OBJECT (psink, "context state is now %d", state);
GST_LOG_OBJECT (psink, "opened the device");
- g_mutex_unlock (pa_ring_buffer_mutex);
+ g_mutex_unlock (pa_shared_ressource_mutex);
pa_threaded_mainloop_unlock (psink->mainloop);
return TRUE;
/* ERRORS */
unlock_and_fail:
{
- g_mutex_unlock (pa_ring_buffer_mutex);
+ g_mutex_unlock (pa_shared_ressource_mutex);
gst_pulsering_destroy_context (pbuf);
pa_threaded_mainloop_unlock (psink->mainloop);
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("Failed to create context"), (NULL));
+ g_slice_free (GstPulseContext, pctx);
goto unlock_and_fail;
}
connect_failed:
{
GstPulseSink *psink;
GstPulseRingBuffer *pbuf;
- GstPulseContext *pctx;
pa_buffer_attr wanted;
const pa_buffer_attr *actual;
pa_channel_map channel_map;
pa_threaded_mainloop_lock (psink->mainloop);
/* we need a context and a no stream */
- pctx = gst_pulsering_get_context (pbuf);
+ g_assert (pbuf->context);
g_assert (!pbuf->stream);
/* enable event notifications */
GST_LOG_OBJECT (psink, "subscribing to context events");
- if (!(o = pa_context_subscribe (pctx->context,
+ if (!(o = pa_context_subscribe (pbuf->context,
PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL)))
goto subscribe_failed;
/* create a stream */
GST_LOG_OBJECT (psink, "creating stream with name %s", name);
if (psink->proplist) {
- if (!(pbuf->stream = pa_stream_new_with_proplist (pctx->context,
+ if (!(pbuf->stream = pa_stream_new_with_proplist (pbuf->context,
name, &pbuf->sample_spec, &channel_map, psink->proplist)))
goto stream_failed;
- } else if (!(pbuf->stream = pa_stream_new (pctx->context,
+ } else if (!(pbuf->stream = pa_stream_new (pbuf->context,
name, &pbuf->sample_spec, &channel_map)))
goto stream_failed;
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_context_subscribe() failed: %s",
- pa_strerror (pa_context_errno (pctx->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto unlock_and_fail;
}
stream_failed:
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("Failed to create stream: %s",
- pa_strerror (pa_context_errno (pctx->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto unlock_and_fail;
}
connect_failed:
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("Failed to connect stream: %s",
- pa_strerror (pa_context_errno (pctx->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto unlock_and_fail;
}
}
{
pa_operation *o = NULL;
GstPulseSink *psink;
- GstPulseContext *pctx = NULL;
gboolean res = FALSE;
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
}
cork_failed:
{
- pctx = gst_pulsering_get_context (pbuf);
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_cork() failed: %s",
- pa_strerror (pa_context_errno (pctx->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto cleanup;
}
}
{
GstPulseSink *psink;
GstPulseRingBuffer *pbuf;
- GstPulseContext *pctx;
guint result;
guint8 *data_end;
gboolean reverse;
}
writable_size_failed:
{
- pctx = gst_pulsering_get_context (pbuf);
-
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_writable_size() failed: %s",
- pa_strerror (pa_context_errno (pctx->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto unlock_and_fail;
}
write_failed:
{
- pctx = gst_pulsering_get_context (pbuf);
-
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_write() failed: %s",
- pa_strerror (pa_context_errno (pctx->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto unlock_and_fail;
}
}
GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink);
#define _do_init(type) \
- gst_pulseringbuffer_init_contexts (); \
+ gst_pulsesink_init_contexts (); \
gst_pulsesink_init_interfaces (type);
GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstBaseAudioSink,
pa_cvolume v;
pa_operation *o = NULL;
GstPulseRingBuffer *pbuf;
- GstPulseContext *pctx;
uint32_t idx;
if (!psink->mainloop)
gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume);
- pctx = gst_pulsering_get_context (pbuf);
-
- if (!(o = pa_context_set_sink_input_volume (pctx->context, idx,
+ if (!(o = pa_context_set_sink_input_volume (pbuf->context, idx,
&v, NULL, NULL)))
goto volume_failed;
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_set_sink_input_volume() failed: %s",
- pa_strerror (pa_context_errno (pctx->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto unlock;
}
}
{
pa_operation *o = NULL;
GstPulseRingBuffer *pbuf;
- GstPulseContext *pctx;
uint32_t idx;
if (!psink->mainloop)
if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
goto no_index;
- pctx = gst_pulsering_get_context (pbuf);
-
- if (!(o = pa_context_set_sink_input_mute (pctx->context, idx,
+ if (!(o = pa_context_set_sink_input_mute (pbuf->context, idx,
mute, NULL, NULL)))
goto mute_failed;
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_set_sink_input_mute() failed: %s",
- pa_strerror (pa_context_errno (pctx->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto unlock;
}
}
gst_pulsesink_get_volume (GstPulseSink * psink)
{
GstPulseRingBuffer *pbuf;
- GstPulseContext *pctx;
pa_operation *o = NULL;
gdouble v = DEFAULT_VOLUME;
uint32_t idx;
if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
goto no_index;
- pctx = gst_pulsering_get_context (pbuf);
-
- if (!(o = pa_context_get_sink_input_info (pctx->context, idx,
+ if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
gst_pulsesink_sink_input_info_cb, pbuf)))
goto info_failed;
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_context_get_sink_input_info() failed: %s",
- pa_strerror (pa_context_errno (pctx->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto unlock;
}
}
gst_pulsesink_get_mute (GstPulseSink * psink)
{
GstPulseRingBuffer *pbuf;
- GstPulseContext *pctx;
pa_operation *o = NULL;
uint32_t idx;
gboolean mute = FALSE;
if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
goto no_index;
- pctx = gst_pulsering_get_context (pbuf);
-
- if (!(o = pa_context_get_sink_input_info (pctx->context, idx,
+ if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
gst_pulsesink_sink_input_info_cb, pbuf)))
goto info_failed;
}
unlock:
-
if (o)
pa_operation_unref (o);
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_context_get_sink_input_info() failed: %s",
- pa_strerror (pa_context_errno (pctx->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto unlock;
}
}
gst_pulsesink_device_description (GstPulseSink * psink)
{
GstPulseRingBuffer *pbuf;
- GstPulseContext *pctx;
pa_operation *o = NULL;
gchar *t;
if (pbuf == NULL || pbuf->stream == NULL)
goto no_buffer;
- pctx = gst_pulsering_get_context (pbuf);
-
- if (!(o = pa_context_get_sink_info_by_index (pctx->context,
+ if (!(o = pa_context_get_sink_info_by_index (pbuf->context,
pa_stream_get_device_index (pbuf->stream),
gst_pulsesink_sink_info_cb, pbuf)))
goto info_failed;
}
unlock:
-
if (o)
pa_operation_unref (o);
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_context_get_sink_info_by_index() failed: %s",
- pa_strerror (pa_context_errno (pctx->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto unlock;
}
}
{
pa_operation *o = NULL;
GstPulseRingBuffer *pbuf;
- GstPulseContext *pctx;
pa_threaded_mainloop_lock (psink->mainloop);
g_free (pbuf->stream_name);
pbuf->stream_name = g_strdup (t);
- pctx = gst_pulsering_get_context (pbuf);
-
if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL)))
goto name_failed;
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_set_name() failed: %s",
- pa_strerror (pa_context_errno (pctx->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto unlock;
}
}
gboolean empty = TRUE;
pa_operation *o = NULL;
GstPulseRingBuffer *pbuf;
- GstPulseContext *pctx;
pl = pa_proplist_new ();
goto finish;
pa_threaded_mainloop_lock (psink->mainloop);
-
pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
if (pbuf == NULL || pbuf->stream == NULL)
goto no_buffer;
- pctx = gst_pulsering_get_context (pbuf);
-
if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE,
pl, NULL, NULL)))
goto update_failed;
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_proplist_update() failed: %s",
- pa_strerror (pa_context_errno (pctx->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto unlock;
}
}
gst_pulsesink_change_state (GstElement * element, GstStateChange transition)
{
GstPulseSink *pulsesink = GST_PULSESINK (element);
+ GstPulseSinkClass *klass = GST_PULSESINK_GET_CLASS (pulsesink);
GstStateChangeReturn ret;
guint res;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
- g_assert (pulsesink->mainloop == NULL);
- pulsesink->mainloop = pa_threaded_mainloop_new ();
- if (!pulsesink->mainloop)
- goto mainloop_failed;
- res = pa_threaded_mainloop_start (pulsesink->mainloop);
- g_assert (res == 0);
-
+ g_mutex_lock (pa_shared_ressource_mutex);
+ if (!klass->main_loop_ref_ct) {
+ GST_INFO_OBJECT (element, "new pa main loop thread");
+ klass->mainloop = pa_threaded_mainloop_new ();
+ if (!klass->mainloop)
+ goto mainloop_failed;
+ klass->main_loop_ref_ct = 1;
+ res = pa_threaded_mainloop_start (klass->mainloop);
+ g_assert (res == 0);
+ g_mutex_unlock (pa_shared_ressource_mutex);
+
+ pulsesink->mainloop = klass->mainloop;
+ } else {
+ GST_INFO_OBJECT (element, "reusing pa main loop thread");
+ pulsesink->mainloop = klass->mainloop;
+ klass->main_loop_ref_ct++;
+ g_mutex_unlock (pa_shared_ressource_mutex);
+ }
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
gst_element_post_message (element,
break;
case GST_STATE_CHANGE_READY_TO_NULL:
if (pulsesink->mainloop) {
- pa_threaded_mainloop_stop (pulsesink->mainloop);
- pa_threaded_mainloop_free (pulsesink->mainloop);
+ g_mutex_lock (pa_shared_ressource_mutex);
+ klass->main_loop_ref_ct--;
pulsesink->mainloop = NULL;
+ if (!klass->main_loop_ref_ct) {
+ GST_INFO_OBJECT (element, "terminating pa main loop thread");
+ pa_threaded_mainloop_stop (klass->mainloop);
+ pa_threaded_mainloop_free (klass->mainloop);
+ klass->mainloop = NULL;
+ }
+ g_mutex_unlock (pa_shared_ressource_mutex);
}
break;
default:
/* ERRORS */
mainloop_failed:
{
+ g_mutex_unlock (pa_shared_ressource_mutex);
GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
("pa_threaded_mainloop_new() failed"), (NULL));
return GST_STATE_CHANGE_FAILURE;
if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
/* Clear the PA mainloop if baseaudiosink failed to open the ring_buffer */
g_assert (pulsesink->mainloop);
- pa_threaded_mainloop_stop (pulsesink->mainloop);
- pa_threaded_mainloop_free (pulsesink->mainloop);
+ g_mutex_lock (pa_shared_ressource_mutex);
+ klass->main_loop_ref_ct--;
pulsesink->mainloop = NULL;
+ if (!klass->main_loop_ref_ct) {
+ GST_INFO_OBJECT (element, "terminating pa main loop thread");
+ pa_threaded_mainloop_stop (klass->mainloop);
+ pa_threaded_mainloop_free (klass->mainloop);
+ klass->mainloop = NULL;
+ }
+ g_mutex_unlock (pa_shared_ressource_mutex);
}
return ret;
}