tee: Check for the removed pad flag also in the slow pushing path
[platform/upstream/gstreamer.git] / gst / gstbin.c
index 9190bfe..17190fd 100644 (file)
@@ -25,6 +25,7 @@
 
 /**
  * SECTION:gstbin
+ * @title: GstBin
  * @short_description: Base class and element that can contain other elements
  *
  * #GstBin is an element that can contain other #GstElement, allowing them to be
  * the bin. Likewise the #GstBin::element-removed signal is fired whenever an
  * element is removed from the bin.
  *
- * <refsect2><title>Notes</title>
- * <para>
+ * ## Notes
+ *
  * A #GstBin internally intercepts every #GstMessage posted by its children and
  * implements the following default behaviour for each of them:
- * <variablelist>
- *   <varlistentry>
- *     <term>GST_MESSAGE_EOS</term>
- *     <listitem><para>This message is only posted by sinks in the PLAYING
- *     state. If all sinks posted the EOS message, this bin will post and EOS
- *     message upwards.</para></listitem>
- *   </varlistentry>
- *   <varlistentry>
- *     <term>GST_MESSAGE_SEGMENT_START</term>
- *     <listitem><para>just collected and never forwarded upwards.
- *     The messages are used to decide when all elements have completed playback
- *     of their segment.</para></listitem>
- *   </varlistentry>
- *   <varlistentry>
- *     <term>GST_MESSAGE_SEGMENT_DONE</term>
- *     <listitem><para> Is posted by #GstBin when all elements that posted
- *     a SEGMENT_START have posted a SEGMENT_DONE.</para></listitem>
- *   </varlistentry>
- *   <varlistentry>
- *     <term>GST_MESSAGE_DURATION_CHANGED</term>
- *     <listitem><para> Is posted by an element that detected a change
- *     in the stream duration. The default bin behaviour is to clear any
- *     cached duration values so that the next duration query will perform
- *     a full duration recalculation. The duration change is posted to the
- *     application so that it can refetch the new duration with a duration
- *     query. Note that these messages can be posted before the bin is
- *     prerolled, in which case the duration query might fail.
- *     </para></listitem>
- *   </varlistentry>
- *   <varlistentry>
- *     <term>GST_MESSAGE_CLOCK_LOST</term>
- *     <listitem><para> This message is posted by an element when it
- *     can no longer provide a clock. The default bin behaviour is to
- *     check if the lost clock was the one provided by the bin. If so and
- *     the bin is currently in the PLAYING state, the message is forwarded to
- *     the bin parent.
- *     This message is also generated when a clock provider is removed from
- *     the bin. If this message is received by the application, it should
- *     PAUSE the pipeline and set it back to PLAYING to force a new clock
- *     distribution.
- *     </para></listitem>
- *   </varlistentry>
- *   <varlistentry>
- *     <term>GST_MESSAGE_CLOCK_PROVIDE</term>
- *     <listitem><para> This message is generated when an element
- *     can provide a clock. This mostly happens when a new clock
- *     provider is added to the bin. The default behaviour of the bin is to
- *     mark the currently selected clock as dirty, which will perform a clock
- *     recalculation the next time the bin is asked to provide a clock.
- *     This message is never sent tot the application but is forwarded to
- *     the parent of the bin.
- *     </para></listitem>
- *   </varlistentry>
- *   <varlistentry>
- *     <term>OTHERS</term>
- *     <listitem><para> posted upwards.</para></listitem>
- *   </varlistentry>
- * </variablelist>
  *
+ * * GST_MESSAGE_EOS: This message is only posted by sinks in the PLAYING
+ * state. If all sinks posted the EOS message, this bin will post and EOS
+ * message upwards.
+ *
+ * * GST_MESSAGE_SEGMENT_START: Just collected and never forwarded upwards.
+ * The messages are used to decide when all elements have completed playback
+ * of their segment.
+ *
+ * * GST_MESSAGE_SEGMENT_DONE: Is posted by #GstBin when all elements that posted
+ * a SEGMENT_START have posted a SEGMENT_DONE.
+ *
+ * * GST_MESSAGE_DURATION_CHANGED: Is posted by an element that detected a change
+ * in the stream duration. The default bin behaviour is to clear any
+ * cached duration values so that the next duration query will perform
+ * a full duration recalculation. The duration change is posted to the
+ * application so that it can refetch the new duration with a duration
+ * query. Note that these messages can be posted before the bin is
+ * prerolled, in which case the duration query might fail.
+ *
+ * * GST_MESSAGE_CLOCK_LOST: This message is posted by an element when it
+ * can no longer provide a clock. The default bin behaviour is to
+ * check if the lost clock was the one provided by the bin. If so and
+ * the bin is currently in the PLAYING state, the message is forwarded to
+ * the bin parent.
+ * This message is also generated when a clock provider is removed from
+ * the bin. If this message is received by the application, it should
+ * PAUSE the pipeline and set it back to PLAYING to force a new clock
+ * distribution.
+ *
+ * * GST_MESSAGE_CLOCK_PROVIDE: This message is generated when an element
+ * can provide a clock. This mostly happens when a new clock
+ * provider is added to the bin. The default behaviour of the bin is to
+ * mark the currently selected clock as dirty, which will perform a clock
+ * recalculation the next time the bin is asked to provide a clock.
+ * This message is never sent tot the application but is forwarded to
+ * the parent of the bin.
+ *
+ * * OTHERS: posted upwards.
  *
  * A #GstBin implements the following default behaviour for answering to a
  * #GstQuery:
- * <variablelist>
- *   <varlistentry>
- *     <term>GST_QUERY_DURATION</term>
- *     <listitem><para>If the query has been asked before with the same format
- *     and the bin is a toplevel bin (ie. has no parent),
- *     use the cached previous value. If no previous value was cached, the
- *     query is sent to all sink elements in the bin and the MAXIMUM of all
- *     values is returned. If the bin is a toplevel bin the value is cached.
- *     If no sinks are available in the bin, the query fails.
- *     </para></listitem>
- *   </varlistentry>
- *   <varlistentry>
- *     <term>GST_QUERY_POSITION</term>
- *     <listitem><para>The query is sent to all sink elements in the bin and the
- *     MAXIMUM of all values is returned. If no sinks are available in the bin,
- *     the query fails.
- *     </para></listitem>
- *   </varlistentry>
- *   <varlistentry>
- *     <term>OTHERS</term>
- *     <listitem><para>the query is forwarded to all sink elements, the result
- *     of the first sink that answers the query successfully is returned. If no
- *     sink is in the bin, the query fails.</para></listitem>
- *   </varlistentry>
- * </variablelist>
  *
- * A #GstBin will by default forward any event sent to it to all sink elements.
- * If all the sinks return %TRUE, the bin will also return %TRUE, else %FALSE is
- * returned. If no sinks are in the bin, the event handler will return %TRUE.
+ * * GST_QUERY_DURATION:If the query has been asked before with the same format
+ * and the bin is a toplevel bin (ie. has no parent),
+ * use the cached previous value. If no previous value was cached, the
+ * query is sent to all sink elements in the bin and the MAXIMUM of all
+ * values is returned. If the bin is a toplevel bin the value is cached.
+ * If no sinks are available in the bin, the query fails.
+ *
+ * * GST_QUERY_POSITION:The query is sent to all sink elements in the bin and the
+ * MAXIMUM of all values is returned. If no sinks are available in the bin,
+ * the query fails.
+ *
+ * * OTHERS:the query is forwarded to all sink elements, the result
+ * of the first sink that answers the query successfully is returned. If no
+ * sink is in the bin, the query fails.
+ *
+ * A #GstBin will by default forward any event sent to it to all sink
+ * (#GST_EVENT_TYPE_DOWNSTREAM) or source (#GST_EVENT_TYPE_UPSTREAM) elements
+ * depending on the event type.
+ * If all the elements return %TRUE, the bin will also return %TRUE, else %FALSE
+ * is returned. If no elements of the required type are in the bin, the event
+ * handler will return %TRUE.
  *
- * </para>
- * </refsect2>
  */
 
 #include "gst_private.h"
@@ -172,9 +144,6 @@ GST_DEBUG_CATEGORY_STATIC (bin_debug);
  * a toplevel bin */
 #define BIN_IS_TOPLEVEL(bin) ((GST_OBJECT_PARENT (bin) == NULL) || bin->priv->asynchandling)
 
-#define GST_BIN_GET_PRIVATE(obj)  \
-   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BIN, GstBinPrivate))
-
 struct _GstBinPrivate
 {
   gboolean asynchandling;
@@ -197,11 +166,11 @@ struct _GstBinPrivate
 
   gboolean posted_eos;
   gboolean posted_playing;
+  GstElementFlags suppressed_flags;
 };
 
 typedef struct
 {
-  GstBin *bin;
   guint32 cookie;
   GstState pending;
 } BinContinueData;
@@ -221,11 +190,15 @@ static GstStateChangeReturn gst_bin_get_state_func (GstElement * element,
 static void bin_handle_async_done (GstBin * bin, GstStateChangeReturn ret,
     gboolean flag_pending, GstClockTime running_time);
 static void bin_handle_async_start (GstBin * bin);
-static void bin_push_state_continue (BinContinueData * data);
+static void bin_push_state_continue (GstBin * bin, BinContinueData * data);
 static void bin_do_eos (GstBin * bin);
 
 static gboolean gst_bin_add_func (GstBin * bin, GstElement * element);
 static gboolean gst_bin_remove_func (GstBin * bin, GstElement * element);
+static void gst_bin_deep_element_added_func (GstBin * bin, GstBin * sub_bin,
+    GstElement * element);
+static void gst_bin_deep_element_removed_func (GstBin * bin, GstBin * sub_bin,
+    GstElement * element);
 static void gst_bin_update_context (GstBin * bin, GstContext * context);
 static void gst_bin_update_context_unlocked (GstBin * bin,
     GstContext * context);
@@ -249,7 +222,7 @@ static gboolean gst_bin_do_latency_func (GstBin * bin);
 
 static void bin_remove_messages (GstBin * bin, GstObject * src,
     GstMessageType types);
-static void gst_bin_continue_func (BinContinueData * data);
+static void gst_bin_continue_func (GstBin * bin, BinContinueData * data);
 static gint bin_element_is_sink (GstElement * child, GstBin * bin);
 static gint bin_element_is_src (GstElement * child, GstBin * bin);
 
@@ -261,6 +234,8 @@ enum
   ELEMENT_ADDED,
   ELEMENT_REMOVED,
   DO_LATENCY,
+  DEEP_ELEMENT_ADDED,
+  DEEP_ELEMENT_REMOVED,
   LAST_SIGNAL
 };
 
@@ -294,7 +269,9 @@ static guint gst_bin_signals[LAST_SIGNAL] = { 0 };
 }
 
 #define gst_bin_parent_class parent_class
-G_DEFINE_TYPE_WITH_CODE (GstBin, gst_bin, GST_TYPE_ELEMENT, _do_init);
+G_DEFINE_TYPE_WITH_CODE (GstBin, gst_bin, GST_TYPE_ELEMENT,
+    G_ADD_PRIVATE (GstBin)
+    _do_init);
 
 static GObject *
 gst_bin_child_proxy_get_child_by_index (GstChildProxy * child_proxy,
@@ -358,13 +335,10 @@ gst_bin_class_init (GstBinClass * klass)
 {
   GObjectClass *gobject_class;
   GstElementClass *gstelement_class;
-  GError *err;
 
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
 
-  g_type_class_add_private (klass, sizeof (GstBinPrivate));
-
   gobject_class->set_property = gst_bin_set_property;
   gobject_class->get_property = gst_bin_get_property;
 
@@ -403,6 +377,36 @@ gst_bin_class_init (GstBinClass * klass)
       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstBinClass, element_removed), NULL,
       NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
   /**
+   * GstBin::deep-element-added:
+   * @bin: the #GstBin
+   * @sub_bin: the #GstBin the element was added to
+   * @element: the #GstElement that was added to @sub_bin
+   *
+   * Will be emitted after the element was added to sub_bin.
+   *
+   * Since: 1.10
+   */
+  gst_bin_signals[DEEP_ELEMENT_ADDED] =
+      g_signal_new ("deep-element-added", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstBinClass, deep_element_added),
+      NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, GST_TYPE_BIN,
+      GST_TYPE_ELEMENT);
+  /**
+   * GstBin::deep-element-removed:
+   * @bin: the #GstBin
+   * @sub_bin: the #GstBin the element was removed from
+   * @element: the #GstElement that was removed from @sub_bin
+   *
+   * Will be emitted after the element was removed from sub_bin.
+   *
+   * Since: 1.10
+   */
+  gst_bin_signals[DEEP_ELEMENT_REMOVED] =
+      g_signal_new ("deep-element-removed", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstBinClass, deep_element_removed),
+      NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, GST_TYPE_BIN,
+      GST_TYPE_ELEMENT);
+  /**
    * GstBin::do-latency:
    * @bin: the #GstBin
    *
@@ -469,15 +473,10 @@ gst_bin_class_init (GstBinClass * klass)
   klass->remove_element = GST_DEBUG_FUNCPTR (gst_bin_remove_func);
   klass->handle_message = GST_DEBUG_FUNCPTR (gst_bin_handle_message_func);
 
-  klass->do_latency = GST_DEBUG_FUNCPTR (gst_bin_do_latency_func);
+  klass->deep_element_added = gst_bin_deep_element_added_func;
+  klass->deep_element_removed = gst_bin_deep_element_removed_func;
 
-  GST_DEBUG ("creating bin thread pool");
-  err = NULL;
-  klass->pool =
-      g_thread_pool_new ((GFunc) gst_bin_continue_func, NULL, -1, FALSE, &err);
-  if (err != NULL) {
-    g_critical ("could not alloc threadpool %s", err->message);
-  }
+  klass->do_latency = GST_DEBUG_FUNCPTR (gst_bin_do_latency_func);
 }
 
 static void
@@ -494,13 +493,14 @@ gst_bin_init (GstBin * bin)
 
   /* Set up a bus for listening to child elements */
   bus = g_object_new (GST_TYPE_BUS, "enable-async", FALSE, NULL);
+  gst_object_ref_sink (bus);
   bin->child_bus = bus;
   GST_DEBUG_OBJECT (bin, "using bus %" GST_PTR_FORMAT " to listen to children",
       bus);
   gst_bus_set_sync_handler (bus, (GstBusSyncHandler) bin_bus_handler, bin,
       NULL);
 
-  bin->priv = GST_BIN_GET_PRIVATE (bin);
+  bin->priv = gst_bin_get_instance_private (bin);
   bin->priv->asynchandling = DEFAULT_ASYNC_HANDLING;
   bin->priv->structure_cookie = 0;
   bin->priv->message_forward = DEFAULT_MESSAGE_FORWARD;
@@ -514,7 +514,7 @@ gst_bin_dispose (GObject * object)
   GstClock **provided_clock_p = &bin->provided_clock;
   GstElement **clock_provider_p = &bin->clock_provider;
 
-  GST_CAT_DEBUG_OBJECT (GST_CAT_REFCOUNTING, object, "dispose");
+  GST_CAT_DEBUG_OBJECT (GST_CAT_REFCOUNTING, object, "%p dispose", object);
 
   GST_OBJECT_LOCK (object);
   gst_object_replace ((GstObject **) child_bus_p, NULL);
@@ -954,7 +954,8 @@ bin_remove_messages (GstBin * bin, GstObject * src, GstMessageType types)
 
     if (message_check (message, &find) == 0) {
       GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message),
-          "deleting message %p of types 0x%08x", message, types);
+          "deleting message %p of type %s (types 0x%08x)", message,
+          GST_MESSAGE_TYPE_NAME (message), types);
       bin->messages = g_list_delete_link (bin->messages, walk);
       gst_message_unref (message);
     } else {
@@ -1083,6 +1084,59 @@ unlink_pads (const GValue * item, gpointer user_data)
   }
 }
 
+static void
+bin_deep_iterator_foreach (const GValue * item, gpointer user_data)
+{
+  GQueue *queue = user_data;
+
+  g_queue_push_tail (queue, g_value_dup_object (item));
+}
+
+static void
+gst_bin_do_deep_add_remove (GstBin * bin, gint sig_id, const gchar * sig_name,
+    GstElement * element)
+{
+  g_signal_emit (bin, sig_id, 0, bin, element);
+
+  /* When removing a bin, emit deep-element-* for everything in the bin too */
+  if (GST_IS_BIN (element)) {
+    GstIterator *it;
+    GstIteratorResult ires;
+    GQueue elements = G_QUEUE_INIT;
+
+    GST_LOG_OBJECT (bin, "Recursing into bin %" GST_PTR_FORMAT " for %s",
+        element, sig_name);
+    it = gst_bin_iterate_recurse (GST_BIN_CAST (element));
+    do {
+      ires = gst_iterator_foreach (it, bin_deep_iterator_foreach, &elements);
+      if (ires != GST_ITERATOR_DONE) {
+        g_queue_foreach (&elements, (GFunc) g_object_unref, NULL);
+        g_queue_clear (&elements);
+      }
+      if (ires == GST_ITERATOR_RESYNC)
+        gst_iterator_resync (it);
+    } while (ires == GST_ITERATOR_RESYNC);
+    if (ires != GST_ITERATOR_ERROR) {
+      GstElement *e;
+
+      while ((e = g_queue_pop_head (&elements))) {
+        GstObject *parent = gst_object_get_parent (GST_OBJECT_CAST (e));
+
+        /* an element could have removed some of its internal elements
+         * meanwhile, so protect against that */
+        if (parent) {
+          GST_LOG_OBJECT (bin, "calling %s for element %" GST_PTR_FORMAT
+              " in bin %" GST_PTR_FORMAT, sig_name, e, parent);
+          g_signal_emit (bin, sig_id, 0, parent, e);
+          gst_object_unref (parent);
+          g_object_unref (e);
+        }
+      }
+    }
+    gst_iterator_free (it);
+  }
+}
+
 /* vmethod that adds an element to a bin
  *
  * MT safe
@@ -1129,23 +1183,25 @@ gst_bin_add_func (GstBin * bin, GstElement * element)
     goto had_parent;
 
   /* if we add a sink we become a sink */
-  if (is_sink) {
+  if (is_sink && !(bin->priv->suppressed_flags & GST_ELEMENT_FLAG_SINK)) {
     GST_CAT_DEBUG_OBJECT (GST_CAT_PARENTAGE, bin, "element \"%s\" was sink",
         elem_name);
     GST_OBJECT_FLAG_SET (bin, GST_ELEMENT_FLAG_SINK);
   }
-  if (is_source) {
+  if (is_source && !(bin->priv->suppressed_flags & GST_ELEMENT_FLAG_SOURCE)) {
     GST_CAT_DEBUG_OBJECT (GST_CAT_PARENTAGE, bin, "element \"%s\" was source",
         elem_name);
     GST_OBJECT_FLAG_SET (bin, GST_ELEMENT_FLAG_SOURCE);
   }
-  if (provides_clock) {
+  if (provides_clock
+      && !(bin->priv->suppressed_flags & GST_ELEMENT_FLAG_PROVIDE_CLOCK)) {
     GST_DEBUG_OBJECT (bin, "element \"%s\" can provide a clock", elem_name);
     clock_message =
         gst_message_new_clock_provide (GST_OBJECT_CAST (element), NULL, TRUE);
     GST_OBJECT_FLAG_SET (bin, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
   }
-  if (requires_clock) {
+  if (requires_clock
+      && !(bin->priv->suppressed_flags & GST_ELEMENT_FLAG_REQUIRE_CLOCK)) {
     GST_DEBUG_OBJECT (bin, "element \"%s\" requires a clock", elem_name);
     GST_OBJECT_FLAG_SET (bin, GST_ELEMENT_FLAG_REQUIRE_CLOCK);
   }
@@ -1249,12 +1305,14 @@ no_state_recalc:
     s = (GstStructure *) gst_message_get_structure (msg);
     gst_structure_get (s, "bin.old.context", GST_TYPE_CONTEXT, &context, NULL);
     gst_structure_remove_field (s, "bin.old.context");
-    gst_element_post_message (GST_ELEMENT_CAST (bin), msg);
+    /* Keep the msg around while we still need access to the context_type */
+    gst_element_post_message (GST_ELEMENT_CAST (bin), gst_message_ref (msg));
 
     /* lock to avoid losing a potential write */
     GST_OBJECT_LOCK (bin);
     replacement =
         gst_element_get_context_unlocked (GST_ELEMENT_CAST (bin), context_type);
+    gst_message_unref (msg);
 
     if (replacement) {
       /* we got the context set from GstElement::set_context */
@@ -1288,7 +1346,9 @@ no_state_recalc:
 
   /* unlink all linked pads */
   it = gst_element_iterate_pads (element);
-  gst_iterator_foreach (it, (GstIteratorForeachFunction) unlink_pads, NULL);
+  while (gst_iterator_foreach (it, (GstIteratorForeachFunction) unlink_pads,
+          NULL) == GST_ITERATOR_RESYNC)
+    gst_iterator_resync (it);
   gst_iterator_free (it);
 
   GST_CAT_DEBUG_OBJECT (GST_CAT_PARENTAGE, bin, "added element \"%s\"",
@@ -1298,6 +1358,9 @@ no_state_recalc:
   gst_child_proxy_child_added ((GstChildProxy *) bin, (GObject *) element,
       elem_name);
 
+  gst_bin_do_deep_add_remove (bin, gst_bin_signals[DEEP_ELEMENT_ADDED],
+      "deep-element-added", element);
+
   g_free (elem_name);
 
   return TRUE;
@@ -1308,6 +1371,8 @@ adding_itself:
     GST_OBJECT_LOCK (bin);
     g_warning ("Cannot add bin '%s' to itself", GST_ELEMENT_NAME (bin));
     GST_OBJECT_UNLOCK (bin);
+    gst_object_ref_sink (element);
+    gst_object_unref (element);
     return FALSE;
   }
 duplicate_name:
@@ -1316,6 +1381,8 @@ duplicate_name:
         elem_name, GST_ELEMENT_NAME (bin));
     GST_OBJECT_UNLOCK (bin);
     g_free (elem_name);
+    gst_object_ref_sink (element);
+    gst_object_unref (element);
     return FALSE;
   }
 had_parent:
@@ -1328,9 +1395,108 @@ had_parent:
 }
 
 /**
+ * gst_bin_set_suppressed_flags:
+ * @bin: a #GstBin
+ * @flags: the #GstElementFlags to suppress
+ *
+ * Suppress the given flags on the bin. #GstElementFlags of a
+ * child element are propagated when it is added to the bin.
+ * When suppressed flags are set, those specified flags will
+ * not be propagated to the bin.
+ *
+ * MT safe.
+ *
+ * Since: 1.10
+ */
+void
+gst_bin_set_suppressed_flags (GstBin * bin, GstElementFlags flags)
+{
+  g_return_if_fail (GST_IS_BIN (bin));
+
+  GST_OBJECT_LOCK (bin);
+  bin->priv->suppressed_flags = bin->priv->suppressed_flags | flags;
+  GST_OBJECT_UNLOCK (bin);
+
+  GST_DEBUG_OBJECT (bin, "Set suppressed flags(0x%x) to bin '%s'", flags,
+      GST_ELEMENT_NAME (bin));
+}
+
+/**
+ * gst_bin_get_suppressed_flags:
+ * @bin: a #GstBin
+ *
+ * Return the suppressed flags of the bin.
+ *
+ * MT safe.
+ *
+ * Returns: the bin's suppressed #GstElementFlags.
+ *
+ * Since: 1.10
+ */
+GstElementFlags
+gst_bin_get_suppressed_flags (GstBin * bin)
+{
+  GstElementFlags res;
+
+  g_return_val_if_fail (GST_IS_BIN (bin), 0);
+
+  GST_OBJECT_LOCK (bin);
+  res = bin->priv->suppressed_flags;
+  GST_OBJECT_UNLOCK (bin);
+
+  return res;
+}
+
+/* signal vfunc, will be called when a new element was added */
+static void
+gst_bin_deep_element_added_func (GstBin * bin, GstBin * sub_bin,
+    GstElement * child)
+{
+  GstBin *parent_bin;
+
+  parent_bin = (GstBin *) gst_object_get_parent (GST_OBJECT_CAST (bin));
+  if (parent_bin == NULL) {
+    GST_LOG_OBJECT (bin, "no parent, reached top-level");
+    return;
+  }
+
+  GST_LOG_OBJECT (parent_bin, "emitting deep-element-added for element "
+      "%" GST_PTR_FORMAT " which has just been added to %" GST_PTR_FORMAT,
+      child, sub_bin);
+
+  g_signal_emit (parent_bin, gst_bin_signals[DEEP_ELEMENT_ADDED], 0, sub_bin,
+      child);
+
+  gst_object_unref (parent_bin);
+}
+
+/* signal vfunc, will be called when an element was removed */
+static void
+gst_bin_deep_element_removed_func (GstBin * bin, GstBin * sub_bin,
+    GstElement * child)
+{
+  GstBin *parent_bin;
+
+  parent_bin = (GstBin *) gst_object_get_parent (GST_OBJECT_CAST (bin));
+  if (parent_bin == NULL) {
+    GST_LOG_OBJECT (bin, "no parent, reached top-level");
+    return;
+  }
+
+  GST_LOG_OBJECT (parent_bin, "emitting deep-element-removed for element "
+      "%" GST_PTR_FORMAT " which has just been removed from %" GST_PTR_FORMAT,
+      sub_bin, child);
+
+  g_signal_emit (parent_bin, gst_bin_signals[DEEP_ELEMENT_REMOVED], 0, sub_bin,
+      child);
+
+  gst_object_unref (parent_bin);
+}
+
+/**
  * gst_bin_add:
  * @bin: a #GstBin
- * @element: (transfer full): the #GstElement to add
+ * @element: (transfer floating): the #GstElement to add
  *
  * Adds the given element to the bin.  Sets the element's parent, and thus
  * takes ownership of the element. An element can only be added to one bin.
@@ -1338,13 +1504,11 @@ had_parent:
  * If the element's pads are linked to other pads, the pads will be unlinked
  * before the element is added to the bin.
  *
- * <note>
- * When you add an element to an already-running pipeline, you will have to
- * take care to set the state of the newly-added element to the desired
- * state (usually PLAYING or PAUSED, same you set the pipeline to originally)
- * with gst_element_set_state(), or use gst_element_sync_state_with_parent().
- * The bin or pipeline will not take care of this for you.
- * </note>
+ * > When you add an element to an already-running pipeline, you will have to
+ * > take care to set the state of the newly-added element to the desired
+ * > state (usually PLAYING or PAUSED, same you set the pipeline to originally)
+ * > with gst_element_set_state(), or use gst_element_sync_state_with_parent().
+ * > The bin or pipeline will not take care of this for you.
  *
  * MT safe.
  *
@@ -1370,7 +1534,9 @@ gst_bin_add (GstBin * bin, GstElement * element)
       GST_STR_NULL (GST_ELEMENT_NAME (element)),
       GST_STR_NULL (GST_ELEMENT_NAME (bin)));
 
+  GST_TRACER_BIN_ADD_PRE (bin, element);
   result = bclass->add_element (bin, element);
+  GST_TRACER_BIN_ADD_POST (bin, element, result);
 
   return result;
 
@@ -1379,6 +1545,8 @@ no_function:
   {
     g_warning ("adding elements to bin '%s' is not supported",
         GST_ELEMENT_NAME (bin));
+    gst_object_ref_sink (element);
+    gst_object_unref (element);
     return FALSE;
   }
 }
@@ -1481,22 +1649,26 @@ gst_bin_remove_func (GstBin * bin, GstElement * element)
   if (!GST_BIN_IS_NO_RESYNC (bin))
     bin->priv->structure_cookie++;
 
-  if (is_sink && !othersink) {
+  if (is_sink && !othersink
+      && !(bin->priv->suppressed_flags & GST_ELEMENT_FLAG_SINK)) {
     /* we're not a sink anymore */
     GST_DEBUG_OBJECT (bin, "we removed the last sink");
     GST_OBJECT_FLAG_UNSET (bin, GST_ELEMENT_FLAG_SINK);
   }
-  if (is_source && !othersource) {
+  if (is_source && !othersource
+      && !(bin->priv->suppressed_flags & GST_ELEMENT_FLAG_SOURCE)) {
     /* we're not a source anymore */
     GST_DEBUG_OBJECT (bin, "we removed the last source");
     GST_OBJECT_FLAG_UNSET (bin, GST_ELEMENT_FLAG_SOURCE);
   }
-  if (provides_clock && !otherprovider) {
+  if (provides_clock && !otherprovider
+      && !(bin->priv->suppressed_flags & GST_ELEMENT_FLAG_PROVIDE_CLOCK)) {
     /* we're not a clock provider anymore */
     GST_DEBUG_OBJECT (bin, "we removed the last clock provider");
     GST_OBJECT_FLAG_UNSET (bin, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
   }
-  if (requires_clock && !otherrequirer) {
+  if (requires_clock && !otherrequirer
+      && !(bin->priv->suppressed_flags & GST_ELEMENT_FLAG_REQUIRE_CLOCK)) {
     /* we're not a clock requirer anymore */
     GST_DEBUG_OBJECT (bin, "we removed the last clock requirer");
     GST_OBJECT_FLAG_UNSET (bin, GST_ELEMENT_FLAG_REQUIRE_CLOCK);
@@ -1621,7 +1793,9 @@ no_state_recalc:
 
   /* unlink all linked pads */
   it = gst_element_iterate_pads (element);
-  gst_iterator_foreach (it, (GstIteratorForeachFunction) unlink_pads, NULL);
+  while (gst_iterator_foreach (it, (GstIteratorForeachFunction) unlink_pads,
+          NULL) == GST_ITERATOR_RESYNC)
+    gst_iterator_resync (it);
   gst_iterator_free (it);
 
   GST_CAT_INFO_OBJECT (GST_CAT_PARENTAGE, bin, "removed child \"%s\"",
@@ -1631,6 +1805,9 @@ no_state_recalc:
   gst_child_proxy_child_removed ((GstChildProxy *) bin, (GObject *) element,
       elem_name);
 
+  gst_bin_do_deep_add_remove (bin, gst_bin_signals[DEEP_ELEMENT_REMOVED],
+      "deep-element-removed", element);
+
   g_free (elem_name);
   /* element is really out of our control now */
   gst_object_unref (element);
@@ -1694,7 +1871,9 @@ gst_bin_remove (GstBin * bin, GstElement * element)
   GST_CAT_DEBUG (GST_CAT_PARENTAGE, "removing element %s from bin %s",
       GST_ELEMENT_NAME (element), GST_ELEMENT_NAME (bin));
 
+  GST_TRACER_BIN_REMOVE_PRE (bin, element);
   result = bclass->remove_element (bin, element);
+  GST_TRACER_BIN_REMOVE_POST (bin, result);
 
   return result;
 
@@ -1942,14 +2121,24 @@ typedef struct _GstBinSortIterator
 } GstBinSortIterator;
 
 static void
+copy_to_queue (gpointer data, gpointer user_data)
+{
+  GstElement *element = data;
+  GQueue *queue = user_data;
+
+  gst_object_ref (element);
+  g_queue_push_tail (queue, element);
+}
+
+static void
 gst_bin_sort_iterator_copy (const GstBinSortIterator * it,
     GstBinSortIterator * copy)
 {
   GHashTableIter iter;
   gpointer key, value;
 
-  copy->queue = it->queue;
-  g_queue_foreach (&copy->queue, (GFunc) gst_object_ref, NULL);
+  g_queue_init (&copy->queue);
+  g_queue_foreach ((GQueue *) & it->queue, copy_to_queue, &copy->queue);
 
   copy->bin = gst_object_ref (it->bin);
   if (it->best)
@@ -2134,6 +2323,12 @@ find_element (GstElement * element, GstBinSortIterator * bit)
   if (bit->best == NULL || bit->best_deg > degree) {
     bit->best = element;
     bit->best_deg = degree;
+  } else if (bit->best_deg == degree
+      && GST_OBJECT_FLAG_IS_SET (bit->best, GST_ELEMENT_FLAG_SOURCE)
+      && !GST_OBJECT_FLAG_IS_SET (element, GST_ELEMENT_FLAG_SOURCE)) {
+    /* If two elements have the same degree, we want to ensure we
+     * return non-source elements first. */
+    bit->best = element;
   }
 }
 
@@ -2381,7 +2576,7 @@ gst_bin_element_set_state (GstBin * bin, GstElement * element,
 do_state:
   GST_OBJECT_LOCK (bin);
   /* the element was busy with an upwards async state change, we must wait for
-   * an ASYNC_DONE message before we attemp to change the state. */
+   * an ASYNC_DONE message before we attempt to change the state. */
   if ((found =
           find_message (bin, GST_OBJECT_CAST (element),
               GST_MESSAGE_ASYNC_START))) {
@@ -2664,14 +2859,18 @@ gst_bin_change_state_func (GstElement * element, GstStateChange transition)
   switch (next) {
     case GST_STATE_PLAYING:
     {
-      gboolean toplevel;
+      gboolean toplevel, asynchandling;
 
       GST_OBJECT_LOCK (bin);
       toplevel = BIN_IS_TOPLEVEL (bin);
+      asynchandling = bin->priv->asynchandling;
       GST_OBJECT_UNLOCK (bin);
 
       if (toplevel)
         gst_bin_recalculate_latency (bin);
+      if (asynchandling)
+        gst_element_post_message (element,
+            gst_message_new_latency (GST_OBJECT_CAST (element)));
       break;
     }
     case GST_STATE_PAUSED:
@@ -2699,6 +2898,11 @@ gst_bin_change_state_func (GstElement * element, GstStateChange transition)
         goto activate_failure;
       break;
     case GST_STATE_NULL:
+      /* Clear message list on next NULL */
+      GST_OBJECT_LOCK (bin);
+      GST_DEBUG_OBJECT (element, "clearing all cached messages");
+      bin_remove_messages (bin, NULL, GST_MESSAGE_ANY);
+      GST_OBJECT_UNLOCK (bin);
       if (current == GST_STATE_READY) {
         if (!(gst_bin_src_pads_activate (bin, FALSE)))
           goto activate_failure;
@@ -2857,12 +3061,12 @@ done:
   /* check if all elements managed to commit their state already */
   if (!find_message (bin, NULL, GST_MESSAGE_ASYNC_START)) {
     /* nothing found, remove all old ASYNC_DONE messages. This can happen when
-     * all the elements commited their state while we were doing the state
+     * all the elements committed their state while we were doing the state
      * change. We will still return ASYNC for consistency but we commit the
      * state already so that a _get_state() will return immediately. */
     bin_remove_messages (bin, NULL, GST_MESSAGE_ASYNC_DONE);
 
-    GST_DEBUG_OBJECT (bin, "async elements commited");
+    GST_DEBUG_OBJECT (bin, "async elements committed");
     bin_handle_async_done (bin, GST_STATE_CHANGE_SUCCESS, FALSE,
         GST_CLOCK_TIME_NONE);
   }
@@ -2897,10 +3101,13 @@ undo:
       GST_DEBUG_OBJECT (element,
           "Bin failed to change state, switching children back to %s",
           gst_element_state_get_name (current));
-      do {
+      while (TRUE) {
         ret =
             gst_iterator_foreach (it, &reset_state, GINT_TO_POINTER (current));
-      } while (ret == GST_ITERATOR_RESYNC);
+        if (ret != GST_ITERATOR_RESYNC)
+          break;
+        gst_iterator_resync (it);
+      }
       gst_iterator_free (it);
     }
     goto done;
@@ -2908,13 +3115,11 @@ undo:
 }
 
 /*
- * This function is a utility event handler for seek events.
- * It will send the event to all sinks or sources and appropriate
- * ghost pads depending on the event-direction.
+ * This function is a utility event handler. It will send the event to all sinks
+ * or sources and appropriate ghost pads depending on the event-direction.
  *
- * Applications are free to override this behaviour and
- * implement their own seek handler, but this will work for
- * pretty much all cases in practice.
+ * Applications are free to override this behaviour and implement their own
+ * handler, but this will work for pretty much all cases in practice.
  */
 static gboolean
 gst_bin_send_event (GstElement * element, GstEvent * event)
@@ -3012,13 +3217,11 @@ gst_bin_send_event (GstElement * element, GstEvent * event)
  * their state, this function will attempt to bring the bin to the next state.
  */
 static void
-gst_bin_continue_func (BinContinueData * data)
+gst_bin_continue_func (GstBin * bin, BinContinueData * data)
 {
-  GstBin *bin;
   GstState current, next, pending;
   GstStateChange transition;
 
-  bin = data->bin;
   pending = data->pending;
 
   GST_DEBUG_OBJECT (bin, "waiting for state lock");
@@ -3052,8 +3255,6 @@ gst_bin_continue_func (BinContinueData * data)
   GST_STATE_UNLOCK (bin);
   GST_DEBUG_OBJECT (bin, "state continue done");
 
-  gst_object_unref (bin);
-  g_slice_free (BinContinueData, data);
   return;
 
 interrupted:
@@ -3061,8 +3262,6 @@ interrupted:
     GST_OBJECT_UNLOCK (bin);
     GST_STATE_UNLOCK (bin);
     GST_DEBUG_OBJECT (bin, "state continue aborted due to intervening change");
-    gst_object_unref (bin);
-    g_slice_free (BinContinueData, data);
     return;
   }
 }
@@ -3082,17 +3281,18 @@ bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin)
 }
 
 static void
-bin_push_state_continue (BinContinueData * data)
+free_bin_continue_data (BinContinueData * data)
 {
-  GstBinClass *klass;
-  GstBin *bin;
-
-  /* ref was taken */
-  bin = data->bin;
-  klass = GST_BIN_GET_CLASS (bin);
+  g_slice_free (BinContinueData, data);
+}
 
+static void
+bin_push_state_continue (GstBin * bin, BinContinueData * data)
+{
   GST_DEBUG_OBJECT (bin, "pushing continue on thread pool");
-  g_thread_pool_push (klass->pool, data, NULL);
+  gst_element_call_async (GST_ELEMENT_CAST (bin),
+      (GstElementCallAsyncFunc) gst_bin_continue_func, data,
+      (GDestroyNotify) free_bin_continue_data);
 }
 
 /* an element started an async state change, if we were not busy with a state
@@ -3255,8 +3455,6 @@ bin_handle_async_done (GstBin * bin, GstStateChangeReturn ret,
 
     cont = g_slice_new (BinContinueData);
 
-    /* ref to the bin */
-    cont->bin = gst_object_ref (bin);
     /* cookie to detect concurrent state change */
     cont->cookie = GST_ELEMENT_CAST (bin)->state_cookie;
     /* pending target state */
@@ -3287,7 +3485,7 @@ bin_handle_async_done (GstBin * bin, GstStateChangeReturn ret,
   if (cont) {
     /* toplevel, start continue state */
     GST_DEBUG_OBJECT (bin, "all async-done, starting state continue");
-    bin_push_state_continue (cont);
+    bin_push_state_continue (bin, cont);
   } else {
     GST_DEBUG_OBJECT (bin, "state change complete");
     GST_STATE_BROADCAST (bin);
@@ -3318,15 +3516,17 @@ nothing_pending:
 static void
 bin_do_eos (GstBin * bin)
 {
-  guint32 seqnum = 0;
+  guint32 seqnum = GST_SEQNUM_INVALID;
   gboolean eos;
 
   GST_OBJECT_LOCK (bin);
   /* If all sinks are EOS, we're in PLAYING and no state change is pending
-   * we forward the EOS message to the parent bin or application
+   * (or we're doing playing to playing and no one else will trigger posting
+   * EOS for us) we forward the EOS message to the parent bin or application
    */
   eos = GST_STATE (bin) == GST_STATE_PLAYING
-      && GST_STATE_PENDING (bin) == GST_STATE_VOID_PENDING
+      && (GST_STATE_PENDING (bin) == GST_STATE_VOID_PENDING ||
+      GST_STATE_PENDING (bin) == GST_STATE_PLAYING)
       && bin->priv->posted_playing && is_eos (bin, &seqnum);
   GST_OBJECT_UNLOCK (bin);
 
@@ -3345,17 +3545,21 @@ bin_do_eos (GstBin * bin)
     GST_OBJECT_UNLOCK (bin);
 
     tmessage = gst_message_new_eos (GST_OBJECT_CAST (bin));
-    gst_message_set_seqnum (tmessage, seqnum);
+    if (seqnum != GST_SEQNUM_INVALID)
+      gst_message_set_seqnum (tmessage, seqnum);
     GST_DEBUG_OBJECT (bin,
         "all sinks posted EOS, posting seqnum #%" G_GUINT32_FORMAT, seqnum);
     gst_element_post_message (GST_ELEMENT_CAST (bin), tmessage);
+  } else {
+    GST_LOG_OBJECT (bin, "Not forwarding EOS due to in progress state change, "
+        " or already posted, or waiting for more EOS");
   }
 }
 
 static void
 bin_do_stream_start (GstBin * bin)
 {
-  guint32 seqnum = 0;
+  guint32 seqnum = GST_SEQNUM_INVALID;
   gboolean stream_start;
   gboolean have_group_id = FALSE;
   guint group_id = 0;
@@ -3375,7 +3579,8 @@ bin_do_stream_start (GstBin * bin)
     GST_OBJECT_UNLOCK (bin);
 
     tmessage = gst_message_new_stream_start (GST_OBJECT_CAST (bin));
-    gst_message_set_seqnum (tmessage, seqnum);
+    if (seqnum != GST_SEQNUM_INVALID)
+      gst_message_set_seqnum (tmessage, seqnum);
     if (have_group_id)
       gst_message_set_group_id (tmessage, group_id);
 
@@ -3752,7 +3957,7 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
         /* nothing found, remove all old ASYNC_DONE messages */
         bin_remove_messages (bin, NULL, GST_MESSAGE_ASYNC_DONE);
 
-        GST_DEBUG_OBJECT (bin, "async elements commited");
+        GST_DEBUG_OBJECT (bin, "async elements committed");
         /* when we get an async done message when a state change was busy, we
          * need to set the pending_done flag so that at the end of the state
          * change we can see if we need to verify pending async elements, hence
@@ -3808,24 +4013,32 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
       GList *l, *contexts;
 
       gst_message_parse_context_type (message, &context_type);
-      GST_OBJECT_LOCK (bin);
-      contexts = GST_ELEMENT_CAST (bin)->contexts;
-      GST_LOG_OBJECT (bin, "got need-context message type: %s", context_type);
-      for (l = contexts; l; l = l->next) {
-        GstContext *tmp = l->data;
-        const gchar *tmp_type = gst_context_get_context_type (tmp);
-
-        if (strcmp (context_type, tmp_type) == 0) {
-          gst_element_set_context (GST_ELEMENT (src), l->data);
-          break;
+
+      if (src) {
+        GST_OBJECT_LOCK (bin);
+        contexts = GST_ELEMENT_CAST (bin)->contexts;
+        GST_LOG_OBJECT (bin, "got need-context message type: %s", context_type);
+        for (l = contexts; l; l = l->next) {
+          GstContext *tmp = l->data;
+          const gchar *tmp_type = gst_context_get_context_type (tmp);
+
+          if (strcmp (context_type, tmp_type) == 0) {
+            gst_element_set_context (GST_ELEMENT (src), l->data);
+            break;
+          }
         }
-      }
-      GST_OBJECT_UNLOCK (bin);
+        GST_OBJECT_UNLOCK (bin);
 
-      /* Forward if we couldn't answer the message */
-      if (l == NULL) {
-        goto forward;
+        /* Forward if we couldn't answer the message */
+        if (l == NULL) {
+          goto forward;
+        } else {
+          gst_message_unref (message);
+        }
       } else {
+        g_warning
+            ("Got need-context message in bin '%s' without source element, dropping",
+            GST_ELEMENT_NAME (bin));
         gst_message_unref (message);
       }
 
@@ -4128,7 +4341,17 @@ gst_bin_query (GstElement * element, GstQuery * query)
       }
       GST_OBJECT_UNLOCK (bin);
 #else
-      GST_FIXME ("implement duration caching in GstBin again");
+#ifndef GST_DISABLE_GST_DEBUG
+      G_STMT_START {
+        /* Quieten this particularly annoying FIXME a bit: */
+        static gboolean printed_fixme = FALSE;
+        if (!printed_fixme) {
+          GST_FIXME ("implement duration caching in GstBin again");
+          printed_fixme = TRUE;
+        }
+      }
+      G_STMT_END;
+#endif
 #endif
       /* no cached value found, iterate and collect durations */
       fold_func = (GstIteratorFoldFunction) bin_query_duration_fold;
@@ -4339,7 +4562,8 @@ compare_interface (const GValue * velement, GValue * interface)
  *
  * MT safe.  Caller owns returned reference.
  *
- * Returns: (transfer full): A #GstElement inside the bin implementing the interface
+ * Returns: (transfer full) (nullable): A #GstElement inside the bin
+ * implementing the interface
  */
 GstElement *
 gst_bin_get_by_interface (GstBin * bin, GType iface)