typedef struct _GstPulseRingBuffer GstPulseRingBuffer;
typedef struct _GstPulseRingBufferClass GstPulseRingBufferClass;
+typedef struct _GstPulseContext GstPulseContext;
+
+struct _GstPulseContext
+{
+ pa_context *context;
+ GSList *ring_buffers;
+};
+
+/* Store the PA contexts in a hash table to allow easy sharing among
+ * multiple instances of the sink. Keys are $context_name@$server_name
+ * (strings) and values should be GstPulseContext pointers. */
+static GHashTable *gst_pulse_shared_contexts;
+
/* We keep a custom ringbuffer that is backed up by data allocated by
* pulseaudio. We must also overide the commit function to write into
* pulseaudio memory instead. */
{
GstRingBuffer object;
+ gchar *context_name;
gchar *stream_name;
- pa_context *context;
pa_stream *stream;
pa_sample_spec sample_spec;
G_DEFINE_TYPE (GstPulseRingBuffer, gst_pulseringbuffer, GST_TYPE_RING_BUFFER);
+static GMutex *pa_ring_buffer_mutex = NULL;
+static void
+gst_pulseringbuffer_init_contexts (void)
+{
+ g_assert (pa_ring_buffer_mutex == NULL);
+ pa_ring_buffer_mutex = g_mutex_new ();
+ gst_pulse_shared_contexts = g_hash_table_new (g_str_hash, g_str_equal);
+}
+
static void
gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass)
{
gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf)
{
pbuf->stream_name = NULL;
- pbuf->context = NULL;
pbuf->stream = NULL;
#ifdef HAVE_PULSE_0_9_13
pbuf->stream_name = NULL;
}
+static GstPulseContext *
+gst_pulsering_get_context (GstPulseRingBuffer * pbuf)
+{
+ GstPulseContext *pctx;
+ GstPulseSink *psink;
+
+ g_mutex_lock (pa_ring_buffer_mutex);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+ pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
+ g_mutex_unlock (pa_ring_buffer_mutex);
+ return pctx;
+}
+
static void
gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf)
{
+ GstPulseContext *pctx;
+ GstPulseSink *psink;
+
+ g_mutex_lock (pa_ring_buffer_mutex);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+
+ pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
+
gst_pulsering_destroy_stream (pbuf);
- if (pbuf->context) {
- pa_context_disconnect (pbuf->context);
+ if (pctx) {
+ pctx->ring_buffers = g_slist_remove (pctx->ring_buffers, pbuf);
+ if (!g_slist_length (pctx->ring_buffers)) {
+ pa_context_disconnect (pctx->context);
- /* Make sure we don't get any further callbacks */
- pa_context_set_state_callback (pbuf->context, NULL, NULL);
+ /* Make sure we don't get any further callbacks */
+ pa_context_set_state_callback (pctx->context, NULL, NULL);
#ifdef HAVE_PULSE_0_9_12
- pa_context_set_subscribe_callback (pbuf->context, NULL, NULL);
+ pa_context_set_subscribe_callback (pctx->context, NULL, NULL);
#endif
- pa_context_unref (pbuf->context);
- pbuf->context = NULL;
+ pa_context_unref (pctx->context);
+ g_hash_table_remove (gst_pulse_shared_contexts, pbuf->context_name);
+ g_free (pbuf->context_name);
+ }
}
+ g_mutex_unlock (pa_ring_buffer_mutex);
}
static void
static gboolean
gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf)
{
- if (!pbuf->context
- || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pbuf->context))
+ GstPulseContext *pctx = gst_pulsering_get_context (pbuf);
+
+ if (!pctx) {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected"), (NULL));
+ return TRUE;
+ }
+
+ if (!pctx->context
+ || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pctx->context))
|| !pbuf->stream
|| !PA_STREAM_IS_GOOD (pa_stream_get_state (pbuf->stream))) {
- const gchar *err_str = pbuf->context ?
- pa_strerror (pa_context_errno (pbuf->context)) : NULL;
-
+ const gchar *err_str =
+ pctx->context ? pa_strerror (pa_context_errno (pctx->context)) : NULL;
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s",
err_str), (NULL));
return TRUE;
gst_pulsering_context_state_cb (pa_context * c, void *userdata)
{
GstPulseSink *psink;
- GstPulseRingBuffer *pbuf;
pa_context_state_t state;
- pbuf = GST_PULSERING_BUFFER_CAST (userdata);
- psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+ GstPulseContext *pctx = (GstPulseContext *) userdata;
+ GSList *walk;
state = pa_context_get_state (c);
- GST_LOG_OBJECT (psink, "got new context state %d", state);
-
- /* psink can be null when we are shutting down and the ringbuffer is already
- * unparented */
- if (psink == NULL)
- return;
- switch (state) {
- case PA_CONTEXT_READY:
- case PA_CONTEXT_TERMINATED:
- case PA_CONTEXT_FAILED:
- GST_LOG_OBJECT (psink, "signaling");
- pa_threaded_mainloop_signal (psink->mainloop, 0);
- break;
+ for (walk = pctx->ring_buffers; walk; walk = g_slist_next (walk)) {
+ GstPulseRingBuffer *pbuf = (GstPulseRingBuffer *) walk->data;
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+ GST_LOG_OBJECT (psink, "got new context state %d", state);
+
+ /* psink can be null when we are shutting down and the ringbuffer is already
+ * unparented */
+ if (psink == NULL)
+ continue;
+
+ switch (state) {
+ case PA_CONTEXT_READY:
+ case PA_CONTEXT_TERMINATED:
+ case PA_CONTEXT_FAILED:
+ GST_LOG_OBJECT (psink, "signaling");
+ pa_threaded_mainloop_signal (psink->mainloop, 0);
+ break;
- case PA_CONTEXT_UNCONNECTED:
- case PA_CONTEXT_CONNECTING:
- case PA_CONTEXT_AUTHORIZING:
- case PA_CONTEXT_SETTING_NAME:
- break;
+ case PA_CONTEXT_UNCONNECTED:
+ case PA_CONTEXT_CONNECTING:
+ case PA_CONTEXT_AUTHORIZING:
+ case PA_CONTEXT_SETTING_NAME:
+ break;
+ }
}
}
pa_subscription_event_type_t t, uint32_t idx, void *userdata)
{
GstPulseSink *psink;
- GstPulseRingBuffer *pbuf;
-
- pbuf = GST_PULSERING_BUFFER_CAST (userdata);
- psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
-
- GST_LOG_OBJECT (psink, "type %d, idx %u", t, idx);
+ GstPulseContext *pctx = (GstPulseContext *) userdata;
+ GSList *walk;
if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) &&
t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW))
return;
- if (!pbuf->stream)
- return;
+ for (walk = pctx->ring_buffers; walk; walk = g_slist_next (walk)) {
+ GstPulseRingBuffer *pbuf = (GstPulseRingBuffer *) walk->data;
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
- if (idx != pa_stream_get_index (pbuf->stream))
- return;
+ GST_LOG_OBJECT (psink, "type %d, idx %u", t, idx);
+
+ if (!pbuf->stream)
+ continue;
- /* Actually this event is also triggered when other properties of
- * the stream change that are unrelated to the volume. However it is
- * probably cheaper to signal the change here and check for the
- * volume when the GObject property is read instead of querying it always. */
+ if (idx != pa_stream_get_index (pbuf->stream))
+ continue;
- /* inform streaming thread to notify */
- g_atomic_int_compare_and_exchange (&psink->notify, 0, 1);
+ /* Actually this event is also triggered when other properties of
+ * the stream change that are unrelated to the volume. However it is
+ * probably cheaper to signal the change here and check for the
+ * volume when the GObject property is read instead of querying it always. */
+
+ /* inform streaming thread to notify */
+ g_atomic_int_compare_and_exchange (&psink->notify, 0, 1);
+ }
}
#endif
{
GstPulseSink *psink;
GstPulseRingBuffer *pbuf;
- gchar *name;
+ GstPulseContext *pctx;
pa_mainloop_api *api;
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
pbuf = GST_PULSERING_BUFFER_CAST (buf);
- g_assert (!pbuf->context);
g_assert (!pbuf->stream);
- name = gst_pulse_client_name ();
+ pbuf->context_name = g_strdup_printf ("%s@%s", gst_pulse_client_name (),
+ GST_STR_NULL (psink->server));
pa_threaded_mainloop_lock (psink->mainloop);
-
- /* get the mainloop api and create a context */
- GST_LOG_OBJECT (psink, "new context with name %s", GST_STR_NULL (name));
- api = pa_threaded_mainloop_get_api (psink->mainloop);
- if (!(pbuf->context = pa_context_new (api, name)))
- goto create_failed;
-
- /* register some essential callbacks */
- pa_context_set_state_callback (pbuf->context,
- gst_pulsering_context_state_cb, pbuf);
+ g_mutex_lock (pa_ring_buffer_mutex);
+
+ pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
+ if (pctx == NULL) {
+ pctx = g_slice_new0 (GstPulseContext);
+ /* get the mainloop api and create a context */
+ GST_LOG_OBJECT (psink, "new context with name %s",
+ GST_STR_NULL (pbuf->context_name));
+ api = pa_threaded_mainloop_get_api (psink->mainloop);
+ if (!(pctx->context = pa_context_new (api, pbuf->context_name)))
+ goto create_failed;
+
+ pctx->ring_buffers = g_slist_append (pctx->ring_buffers, pbuf);
+ g_hash_table_insert (gst_pulse_shared_contexts, pbuf->context_name,
+ (gpointer) pctx);
+ /* register some essential callbacks */
+ pa_context_set_state_callback (pctx->context,
+ gst_pulsering_context_state_cb, pctx);
#ifdef HAVE_PULSE_0_9_12
- pa_context_set_subscribe_callback (pbuf->context,
- gst_pulsering_context_subscribe_cb, pbuf);
+ pa_context_set_subscribe_callback (pctx->context,
+ gst_pulsering_context_subscribe_cb, pctx);
#endif
- /* try to connect to the server and wait for completioni, we don't want to
- * autospawn a deamon */
- GST_LOG_OBJECT (psink, "connect to server %s", GST_STR_NULL (psink->server));
- if (pa_context_connect (pbuf->context, psink->server, PA_CONTEXT_NOAUTOSPAWN,
- NULL) < 0)
- goto connect_failed;
+ /* try to connect to the server and wait for completioni, we don't want to
+ * autospawn a deamon */
+ GST_LOG_OBJECT (psink, "connect to server %s",
+ GST_STR_NULL (psink->server));
+ if (pa_context_connect (pctx->context, psink->server,
+ PA_CONTEXT_NOAUTOSPAWN, NULL) < 0)
+ goto connect_failed;
+
+
+ } else {
+ GST_LOG_OBJECT (psink, "reusing shared pulseaudio context with name %s",
+ GST_STR_NULL (pbuf->context_name));
+ pctx->ring_buffers = g_slist_append (pctx->ring_buffers, pbuf);
+ }
for (;;) {
pa_context_state_t state;
- state = pa_context_get_state (pbuf->context);
+ state = pa_context_get_state (pctx->context);
GST_LOG_OBJECT (psink, "context state is now %d", state);
GST_LOG_OBJECT (psink, "opened the device");
+ g_mutex_unlock (pa_ring_buffer_mutex);
pa_threaded_mainloop_unlock (psink->mainloop);
- g_free (name);
return TRUE;
/* ERRORS */
unlock_and_fail:
{
+ g_mutex_unlock (pa_ring_buffer_mutex);
gst_pulsering_destroy_context (pbuf);
pa_threaded_mainloop_unlock (psink->mainloop);
- g_free (name);
return FALSE;
}
create_failed:
connect_failed:
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to connect: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto unlock_and_fail;
}
}
{
GstPulseSink *psink;
GstPulseRingBuffer *pbuf;
+ GstPulseContext *pctx;
pa_buffer_attr wanted;
const pa_buffer_attr *actual;
pa_channel_map channel_map;
pa_threaded_mainloop_lock (psink->mainloop);
/* we need a context and a no stream */
- g_assert (pbuf->context);
+ pctx = gst_pulsering_get_context (pbuf);
g_assert (!pbuf->stream);
/* enable event notifications */
GST_LOG_OBJECT (psink, "subscribing to context events");
- if (!(o = pa_context_subscribe (pbuf->context,
+ if (!(o = pa_context_subscribe (pctx->context,
PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL)))
goto subscribe_failed;
/* create a stream */
GST_LOG_OBJECT (psink, "creating stream with name %s", name);
- if (!(pbuf->stream = pa_stream_new (pbuf->context,
+ if (!(pbuf->stream = pa_stream_new (pctx->context,
name, &pbuf->sample_spec, &channel_map)))
goto stream_failed;
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_context_subscribe() failed: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto unlock_and_fail;
}
stream_failed:
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("Failed to create stream: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto unlock_and_fail;
}
connect_failed:
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("Failed to connect stream: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto unlock_and_fail;
}
}
{
pa_operation *o = NULL;
GstPulseSink *psink;
+ GstPulseContext *pctx = NULL;
gboolean res = FALSE;
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
}
cork_failed:
{
+ pctx = gst_pulsering_get_context (pbuf);
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_cork() failed: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto cleanup;
}
}
{
GstPulseSink *psink;
GstPulseRingBuffer *pbuf;
+ GstPulseContext *pctx;
guint result;
guint8 *data_end;
gboolean reverse;
}
writable_size_failed:
{
+ pctx = gst_pulsering_get_context (pbuf);
+
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_writable_size() failed: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto unlock_and_fail;
}
write_failed:
{
+ pctx = gst_pulsering_get_context (pbuf);
+
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_write() failed: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto unlock_and_fail;
}
}
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_pulsesink_change_state);
+ gst_pulseringbuffer_init_contexts ();
+
gstaudiosink_class->create_ringbuffer =
GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer);
pa_cvolume v;
pa_operation *o = NULL;
GstPulseRingBuffer *pbuf;
+ GstPulseContext *pctx;
uint32_t idx;
if (!psink->mainloop)
gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume);
- if (!(o = pa_context_set_sink_input_volume (pbuf->context, idx,
+ pctx = gst_pulsering_get_context (pbuf);
+
+ if (!(o = pa_context_set_sink_input_volume (pctx->context, idx,
&v, NULL, NULL)))
goto volume_failed;
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_set_sink_input_volume() failed: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto unlock;
}
}
{
pa_operation *o = NULL;
GstPulseRingBuffer *pbuf;
+ GstPulseContext *pctx;
uint32_t idx;
if (!psink->mainloop)
if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
goto no_index;
- if (!(o = pa_context_set_sink_input_mute (pbuf->context, idx,
+ pctx = gst_pulsering_get_context (pbuf);
+
+ if (!(o = pa_context_set_sink_input_mute (pctx->context, idx,
mute, NULL, NULL)))
goto mute_failed;
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_set_sink_input_mute() failed: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto unlock;
}
}
gst_pulsesink_get_volume (GstPulseSink * psink)
{
GstPulseRingBuffer *pbuf;
+ GstPulseContext *pctx;
pa_operation *o = NULL;
gdouble v = DEFAULT_VOLUME;
uint32_t idx;
if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
goto no_index;
- if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
+ pctx = gst_pulsering_get_context (pbuf);
+
+ if (!(o = pa_context_get_sink_input_info (pctx->context, idx,
gst_pulsesink_sink_input_info_cb, pbuf)))
goto info_failed;
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_context_get_sink_input_info() failed: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto unlock;
}
}
gst_pulsesink_get_mute (GstPulseSink * psink)
{
GstPulseRingBuffer *pbuf;
+ GstPulseContext *pctx;
pa_operation *o = NULL;
uint32_t idx;
gboolean mute = FALSE;
if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
goto no_index;
- if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
+ pctx = gst_pulsering_get_context (pbuf);
+
+ if (!(o = pa_context_get_sink_input_info (pctx->context, idx,
gst_pulsesink_sink_input_info_cb, pbuf)))
goto info_failed;
}
unlock:
+
if (o)
pa_operation_unref (o);
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_context_get_sink_input_info() failed: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto unlock;
}
}
gst_pulsesink_device_description (GstPulseSink * psink)
{
GstPulseRingBuffer *pbuf;
+ GstPulseContext *pctx;
pa_operation *o = NULL;
gchar *t;
if (pbuf == NULL || pbuf->stream == NULL)
goto no_buffer;
- if (!(o = pa_context_get_sink_info_by_index (pbuf->context,
+ pctx = gst_pulsering_get_context (pbuf);
+
+ if (!(o = pa_context_get_sink_info_by_index (pctx->context,
pa_stream_get_device_index (pbuf->stream),
gst_pulsesink_sink_info_cb, pbuf)))
goto info_failed;
}
unlock:
+
if (o)
pa_operation_unref (o);
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_context_get_sink_info_by_index() failed: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto unlock;
}
}
{
pa_operation *o = NULL;
GstPulseRingBuffer *pbuf;
+ GstPulseContext *pctx;
pa_threaded_mainloop_lock (psink->mainloop);
g_free (pbuf->stream_name);
pbuf->stream_name = g_strdup (t);
+ pctx = gst_pulsering_get_context (pbuf);
+
if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL)))
goto name_failed;
/* We're not interested if this operation failed or not */
unlock:
+
if (o)
pa_operation_unref (o);
pa_threaded_mainloop_unlock (psink->mainloop);
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_set_name() failed: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto unlock;
}
}
gboolean empty = TRUE;
pa_operation *o = NULL;
GstPulseRingBuffer *pbuf;
+ GstPulseContext *pctx;
pl = pa_proplist_new ();
goto finish;
pa_threaded_mainloop_lock (psink->mainloop);
+
pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
if (pbuf == NULL || pbuf->stream == NULL)
goto no_buffer;
+ pctx = gst_pulsering_get_context (pbuf);
+
if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE,
pl, NULL, NULL)))
goto update_failed;
{
GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_proplist_update() failed: %s",
- pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ pa_strerror (pa_context_errno (pctx->context))), (NULL));
goto unlock;
}
}