element: Add gst_element_call_async()
authorSebastian Dröge <sebastian@centricular.com>
Sun, 28 Feb 2016 10:06:40 +0000 (12:06 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Wed, 27 Apr 2016 15:51:33 +0000 (18:51 +0300)
This calls a function from another thread, asynchronously. This is to be
used for cases when a state change has to be performed from a streaming
thread, directly via gst_element_set_state() or indirectly e.g. via SEEK
events.

Calling those functions directly from the streaming thread will cause
deadlocks in many situations, as they might involve waiting for the
streaming thread to shut down from this very streaming thread.

This is mostly a convenience function around a GThreadPool and is for example
used by GstBin to continue asynchronous state changes.

https://bugzilla.gnome.org/show_bug.cgi?id=760532

docs/gst/gstreamer-sections.txt
gst/gstbin.c
gst/gstbin.h
gst/gstelement.c
gst/gstelement.h
win32/common/libgstreamer.def

index 24d6aca..b5a9d92 100644 (file)
@@ -917,6 +917,10 @@ gst_element_add_property_notify_watch
 gst_element_add_property_deep_notify_watch
 gst_element_remove_property_notify_watch
 
+<SUBSECTION element-call-async>
+GstElementCallAsyncFunc
+gst_element_call_async
+
 <SUBSECTION Standard>
 GST_ELEMENT
 GST_IS_ELEMENT
index 05cbd8f..26af1c7 100644 (file)
@@ -201,7 +201,6 @@ struct _GstBinPrivate
 
 typedef struct
 {
-  GstBin *bin;
   guint32 cookie;
   GstState pending;
 } BinContinueData;
@@ -221,7 +220,7 @@ static GstStateChangeReturn gst_bin_get_state_func (GstElement * element,
 static void bin_handle_async_done (GstBin * bin, GstStateChangeReturn ret,
     gboolean flag_pending, GstClockTime running_time);
 static void bin_handle_async_start (GstBin * bin);
-static void bin_push_state_continue (BinContinueData * data);
+static void bin_push_state_continue (GstBin * bin, BinContinueData * data);
 static void bin_do_eos (GstBin * bin);
 
 static gboolean gst_bin_add_func (GstBin * bin, GstElement * element);
@@ -249,7 +248,7 @@ static gboolean gst_bin_do_latency_func (GstBin * bin);
 
 static void bin_remove_messages (GstBin * bin, GstObject * src,
     GstMessageType types);
-static void gst_bin_continue_func (BinContinueData * data);
+static void gst_bin_continue_func (GstBin * bin, BinContinueData * data);
 static gint bin_element_is_sink (GstElement * child, GstBin * bin);
 static gint bin_element_is_src (GstElement * child, GstBin * bin);
 
@@ -358,7 +357,6 @@ gst_bin_class_init (GstBinClass * klass)
 {
   GObjectClass *gobject_class;
   GstElementClass *gstelement_class;
-  GError *err;
 
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
@@ -470,14 +468,6 @@ gst_bin_class_init (GstBinClass * klass)
   klass->handle_message = GST_DEBUG_FUNCPTR (gst_bin_handle_message_func);
 
   klass->do_latency = GST_DEBUG_FUNCPTR (gst_bin_do_latency_func);
-
-  GST_DEBUG ("creating bin thread pool");
-  err = NULL;
-  klass->pool =
-      g_thread_pool_new ((GFunc) gst_bin_continue_func, NULL, -1, FALSE, &err);
-  if (err != NULL) {
-    g_critical ("could not alloc threadpool %s", err->message);
-  }
 }
 
 static void
@@ -3025,13 +3015,11 @@ gst_bin_send_event (GstElement * element, GstEvent * event)
  * their state, this function will attempt to bring the bin to the next state.
  */
 static void
-gst_bin_continue_func (BinContinueData * data)
+gst_bin_continue_func (GstBin * bin, BinContinueData * data)
 {
-  GstBin *bin;
   GstState current, next, pending;
   GstStateChange transition;
 
-  bin = data->bin;
   pending = data->pending;
 
   GST_DEBUG_OBJECT (bin, "waiting for state lock");
@@ -3065,8 +3053,6 @@ gst_bin_continue_func (BinContinueData * data)
   GST_STATE_UNLOCK (bin);
   GST_DEBUG_OBJECT (bin, "state continue done");
 
-  gst_object_unref (bin);
-  g_slice_free (BinContinueData, data);
   return;
 
 interrupted:
@@ -3074,8 +3060,6 @@ interrupted:
     GST_OBJECT_UNLOCK (bin);
     GST_STATE_UNLOCK (bin);
     GST_DEBUG_OBJECT (bin, "state continue aborted due to intervening change");
-    gst_object_unref (bin);
-    g_slice_free (BinContinueData, data);
     return;
   }
 }
@@ -3095,17 +3079,18 @@ bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin)
 }
 
 static void
-bin_push_state_continue (BinContinueData * data)
+free_bin_continue_data (BinContinueData * data)
 {
-  GstBinClass *klass;
-  GstBin *bin;
-
-  /* ref was taken */
-  bin = data->bin;
-  klass = GST_BIN_GET_CLASS (bin);
+  g_slice_free (BinContinueData, data);
+}
 
+static void
+bin_push_state_continue (GstBin * bin, BinContinueData * data)
+{
   GST_DEBUG_OBJECT (bin, "pushing continue on thread pool");
-  g_thread_pool_push (klass->pool, data, NULL);
+  gst_element_call_async (GST_ELEMENT_CAST (bin),
+      (GstElementCallAsyncFunc) gst_bin_continue_func, data,
+      (GDestroyNotify) free_bin_continue_data);
 }
 
 /* an element started an async state change, if we were not busy with a state
@@ -3268,8 +3253,6 @@ bin_handle_async_done (GstBin * bin, GstStateChangeReturn ret,
 
     cont = g_slice_new (BinContinueData);
 
-    /* ref to the bin */
-    cont->bin = gst_object_ref (bin);
     /* cookie to detect concurrent state change */
     cont->cookie = GST_ELEMENT_CAST (bin)->state_cookie;
     /* pending target state */
@@ -3300,7 +3283,7 @@ bin_handle_async_done (GstBin * bin, GstStateChangeReturn ret,
   if (cont) {
     /* toplevel, start continue state */
     GST_DEBUG_OBJECT (bin, "all async-done, starting state continue");
-    bin_push_state_continue (cont);
+    bin_push_state_continue (bin, cont);
   } else {
     GST_DEBUG_OBJECT (bin, "state change complete");
     GST_STATE_BROADCAST (bin);
index c8ffeff..b67e24f 100644 (file)
@@ -152,7 +152,7 @@ struct _GstBinClass {
   GstElementClass parent_class;
 
   /*< private >*/
-  GThreadPool  *pool;
+  GThreadPool  *pool; /* deprecated */
 
   /* signals */
   void         (*element_added)        (GstBin *bin, GstElement *child);
index 75ca7b2..0b204f9 100644 (file)
@@ -147,9 +147,13 @@ static GstPadTemplate
     * gst_element_class_get_request_pad_template (GstElementClass *
     element_class, const gchar * name);
 
+static void gst_element_call_async_func (gpointer data, gpointer user_data);
+
 static GstObjectClass *parent_class = NULL;
 static guint gst_element_signals[LAST_SIGNAL] = { 0 };
 
+static GThreadPool *gst_element_pool = NULL;
+
 /* this is used in gstelementfactory.c:gst_element_register() */
 GQuark __gst_elementclass_factory = 0;
 
@@ -187,6 +191,7 @@ static void
 gst_element_class_init (GstElementClass * klass)
 {
   GObjectClass *gobject_class;
+  GError *err = NULL;
 
   gobject_class = (GObjectClass *) klass;
 
@@ -247,6 +252,15 @@ gst_element_class_init (GstElementClass * klass)
   klass->set_context = GST_DEBUG_FUNCPTR (gst_element_set_context_default);
 
   klass->elementfactory = NULL;
+
+  GST_DEBUG ("creating element thread pool");
+  gst_element_pool =
+      g_thread_pool_new ((GFunc) gst_element_call_async_func, NULL, -1, FALSE,
+      &err);
+  if (err != NULL) {
+    g_critical ("could not alloc threadpool %s", err->message);
+    g_clear_error (&err);
+  }
 }
 
 static void
@@ -3364,3 +3378,60 @@ gst_element_remove_property_notify_watch (GstElement * element, gulong watch_id)
 {
   g_signal_handler_disconnect (element, watch_id);
 }
+
+typedef struct
+{
+  GstElement *element;
+  GstElementCallAsyncFunc func;
+  gpointer user_data;
+  GDestroyNotify destroy_notify;
+} GstElementCallAsyncData;
+
+static void
+gst_element_call_async_func (gpointer data, gpointer user_data)
+{
+  GstElementCallAsyncData *async_data = data;
+
+  async_data->func (async_data->element, async_data->user_data);
+  if (async_data->destroy_notify)
+    async_data->destroy_notify (async_data->user_data);
+  gst_object_unref (async_data->element);
+  g_free (async_data);
+}
+
+/**
+ * gst_element_call_async:
+ * @element: a #GstElement
+ * @func: Function to call asynchronously from another thread
+ * @user_data: Data to pass to @func
+ * @destroy_notify: GDestroyNotify for @user_data
+ *
+ * Calls @func from another thread and passes @user_data to it. This is to be
+ * used for cases when a state change has to be performed from a streaming
+ * thread, directly via gst_element_set_state() or indirectly e.g. via SEEK
+ * events.
+ *
+ * Calling those functions directly from the streaming thread will cause
+ * deadlocks in many situations, as they might involve waiting for the
+ * streaming thread to shut down from this very streaming thread.
+ *
+ * MT safe.
+ *
+ * Since: 1.10
+ */
+void
+gst_element_call_async (GstElement * element, GstElementCallAsyncFunc func,
+    gpointer user_data, GDestroyNotify destroy_notify)
+{
+  GstElementCallAsyncData *async_data;
+
+  g_return_if_fail (GST_IS_ELEMENT (element));
+
+  async_data = g_new0 (GstElementCallAsyncData, 1);
+  async_data->element = gst_object_ref (element);
+  async_data->func = func;
+  async_data->user_data = user_data;
+  async_data->destroy_notify = destroy_notify;
+
+  g_thread_pool_push (gst_element_pool, async_data, NULL);
+}
index 39aa8cf..a18b0ca 100644 (file)
@@ -816,6 +816,13 @@ GstStateChangeReturn    gst_element_continue_state      (GstElement * element,
                                                          GstStateChangeReturn ret);
 void                    gst_element_lost_state          (GstElement * element);
 
+typedef void          (*GstElementCallAsyncFunc)        (GstElement * element,
+                                                         gpointer     user_data);
+
+void                    gst_element_call_async          (GstElement * element,
+                                                         GstElementCallAsyncFunc func, gpointer user_data,
+                                                         GDestroyNotify destroy_notify);
+
 /* factory management */
 GstElementFactory*      gst_element_get_factory         (GstElement *element);
 
index 0b54e8d..7033a5d 100644 (file)
@@ -478,6 +478,7 @@ EXPORTS
        gst_element_add_pad
        gst_element_add_property_deep_notify_watch
        gst_element_add_property_notify_watch
+       gst_element_call_async
        gst_element_change_state
        gst_element_class_add_metadata
        gst_element_class_add_pad_template