typedef struct
{
- GstBin *bin;
guint32 cookie;
GstState pending;
} BinContinueData;
static void bin_handle_async_done (GstBin * bin, GstStateChangeReturn ret,
gboolean flag_pending, GstClockTime running_time);
static void bin_handle_async_start (GstBin * bin);
-static void bin_push_state_continue (BinContinueData * data);
+static void bin_push_state_continue (GstBin * bin, BinContinueData * data);
static void bin_do_eos (GstBin * bin);
static gboolean gst_bin_add_func (GstBin * bin, GstElement * element);
static void bin_remove_messages (GstBin * bin, GstObject * src,
GstMessageType types);
-static void gst_bin_continue_func (BinContinueData * data);
+static void gst_bin_continue_func (GstBin * bin, BinContinueData * data);
static gint bin_element_is_sink (GstElement * child, GstBin * bin);
static gint bin_element_is_src (GstElement * child, GstBin * bin);
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
- GError *err;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
klass->handle_message = GST_DEBUG_FUNCPTR (gst_bin_handle_message_func);
klass->do_latency = GST_DEBUG_FUNCPTR (gst_bin_do_latency_func);
-
- GST_DEBUG ("creating bin thread pool");
- err = NULL;
- klass->pool =
- g_thread_pool_new ((GFunc) gst_bin_continue_func, NULL, -1, FALSE, &err);
- if (err != NULL) {
- g_critical ("could not alloc threadpool %s", err->message);
- }
}
static void
* their state, this function will attempt to bring the bin to the next state.
*/
static void
-gst_bin_continue_func (BinContinueData * data)
+gst_bin_continue_func (GstBin * bin, BinContinueData * data)
{
- GstBin *bin;
GstState current, next, pending;
GstStateChange transition;
- bin = data->bin;
pending = data->pending;
GST_DEBUG_OBJECT (bin, "waiting for state lock");
GST_STATE_UNLOCK (bin);
GST_DEBUG_OBJECT (bin, "state continue done");
- gst_object_unref (bin);
- g_slice_free (BinContinueData, data);
return;
interrupted:
GST_OBJECT_UNLOCK (bin);
GST_STATE_UNLOCK (bin);
GST_DEBUG_OBJECT (bin, "state continue aborted due to intervening change");
- gst_object_unref (bin);
- g_slice_free (BinContinueData, data);
return;
}
}
}
static void
-bin_push_state_continue (BinContinueData * data)
+free_bin_continue_data (BinContinueData * data)
{
- GstBinClass *klass;
- GstBin *bin;
-
- /* ref was taken */
- bin = data->bin;
- klass = GST_BIN_GET_CLASS (bin);
+ g_slice_free (BinContinueData, data);
+}
+static void
+bin_push_state_continue (GstBin * bin, BinContinueData * data)
+{
GST_DEBUG_OBJECT (bin, "pushing continue on thread pool");
- g_thread_pool_push (klass->pool, data, NULL);
+ gst_element_call_async (GST_ELEMENT_CAST (bin),
+ (GstElementCallAsyncFunc) gst_bin_continue_func, data,
+ (GDestroyNotify) free_bin_continue_data);
}
/* an element started an async state change, if we were not busy with a state
cont = g_slice_new (BinContinueData);
- /* ref to the bin */
- cont->bin = gst_object_ref (bin);
/* cookie to detect concurrent state change */
cont->cookie = GST_ELEMENT_CAST (bin)->state_cookie;
/* pending target state */
if (cont) {
/* toplevel, start continue state */
GST_DEBUG_OBJECT (bin, "all async-done, starting state continue");
- bin_push_state_continue (cont);
+ bin_push_state_continue (bin, cont);
} else {
GST_DEBUG_OBJECT (bin, "state change complete");
GST_STATE_BROADCAST (bin);
* gst_element_class_get_request_pad_template (GstElementClass *
element_class, const gchar * name);
+static void gst_element_call_async_func (gpointer data, gpointer user_data);
+
static GstObjectClass *parent_class = NULL;
static guint gst_element_signals[LAST_SIGNAL] = { 0 };
+static GThreadPool *gst_element_pool = NULL;
+
/* this is used in gstelementfactory.c:gst_element_register() */
GQuark __gst_elementclass_factory = 0;
gst_element_class_init (GstElementClass * klass)
{
GObjectClass *gobject_class;
+ GError *err = NULL;
gobject_class = (GObjectClass *) klass;
klass->set_context = GST_DEBUG_FUNCPTR (gst_element_set_context_default);
klass->elementfactory = NULL;
+
+ GST_DEBUG ("creating element thread pool");
+ gst_element_pool =
+ g_thread_pool_new ((GFunc) gst_element_call_async_func, NULL, -1, FALSE,
+ &err);
+ if (err != NULL) {
+ g_critical ("could not alloc threadpool %s", err->message);
+ g_clear_error (&err);
+ }
}
static void
{
g_signal_handler_disconnect (element, watch_id);
}
+
+typedef struct
+{
+ GstElement *element;
+ GstElementCallAsyncFunc func;
+ gpointer user_data;
+ GDestroyNotify destroy_notify;
+} GstElementCallAsyncData;
+
+static void
+gst_element_call_async_func (gpointer data, gpointer user_data)
+{
+ GstElementCallAsyncData *async_data = data;
+
+ async_data->func (async_data->element, async_data->user_data);
+ if (async_data->destroy_notify)
+ async_data->destroy_notify (async_data->user_data);
+ gst_object_unref (async_data->element);
+ g_free (async_data);
+}
+
+/**
+ * gst_element_call_async:
+ * @element: a #GstElement
+ * @func: Function to call asynchronously from another thread
+ * @user_data: Data to pass to @func
+ * @destroy_notify: GDestroyNotify for @user_data
+ *
+ * Calls @func from another thread and passes @user_data to it. This is to be
+ * used for cases when a state change has to be performed from a streaming
+ * thread, directly via gst_element_set_state() or indirectly e.g. via SEEK
+ * events.
+ *
+ * Calling those functions directly from the streaming thread will cause
+ * deadlocks in many situations, as they might involve waiting for the
+ * streaming thread to shut down from this very streaming thread.
+ *
+ * MT safe.
+ *
+ * Since: 1.10
+ */
+void
+gst_element_call_async (GstElement * element, GstElementCallAsyncFunc func,
+ gpointer user_data, GDestroyNotify destroy_notify)
+{
+ GstElementCallAsyncData *async_data;
+
+ g_return_if_fail (GST_IS_ELEMENT (element));
+
+ async_data = g_new0 (GstElementCallAsyncData, 1);
+ async_data->element = gst_object_ref (element);
+ async_data->func = func;
+ async_data->user_data = user_data;
+ async_data->destroy_notify = destroy_notify;
+
+ g_thread_pool_push (gst_element_pool, async_data, NULL);
+}