element: Enforce that elements created by gst_element_factory_create/make() are floating
[platform/upstream/gstreamer.git] / gst / gstpad.c
index e835e29..dcd52f2 100644 (file)
@@ -21,6 +21,7 @@
  */
 /**
  * SECTION:gstpad
+ * @title: GstPad
  * @short_description: Object contained by elements that allows links to
  *                     other elements
  * @see_also: #GstPadTemplate, #GstElement, #GstEvent, #GstQuery, #GstBuffer
@@ -94,6 +95,7 @@
 #include "gstutils.h"
 #include "gstinfo.h"
 #include "gsterror.h"
+#include "gsttracerutils.h"
 #include "gstvalue.h"
 #include "glib-compat-private.h"
 
@@ -115,11 +117,11 @@ enum
   PAD_PROP_CAPS,
   PAD_PROP_DIRECTION,
   PAD_PROP_TEMPLATE,
-  /* FILL ME */
+  PAD_PROP_OFFSET
+      /* FILL ME */
 };
 
-#define GST_PAD_GET_PRIVATE(obj)  \
-   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_PAD, GstPadPrivate))
+#define _PAD_PROBE_TYPE_ALL_BOTH_AND_FLUSH (GST_PAD_PROBE_TYPE_ALL_BOTH | GST_PAD_PROBE_TYPE_EVENT_FLUSH)
 
 /* we have a pending and an active event on the pad. On source pads only the
  * active event is used. On sinkpads, events are copied to the pending entry and
@@ -138,16 +140,25 @@ struct _GstPadPrivate
 
   gint using;
   guint probe_list_cookie;
-  guint probe_cookie;
+
+  /* counter of how many idle probes are running directly from the add_probe
+   * call. Used to block any data flowing in the pad while the idle callback
+   * Doesn't finish its work */
+  gint idle_running;
+
+  /* conditional and variable used to ensure pads only get (de)activated
+   * by a single thread at a time. Protected by the object lock */
+  GCond activation_cond;
+  gboolean in_activation;
 };
 
 typedef struct
 {
   GHook hook;
-  guint cookie;
 } GstProbe;
 
-#define PROBE_COOKIE(h) (((GstProbe *)(h))->cookie)
+#define GST_PAD_IS_RUNNING_IDLE_PROBE(p) \
+    (((GstPad *)(p))->priv->idle_running > 0)
 
 typedef struct
 {
@@ -155,8 +166,13 @@ typedef struct
   GstPadProbeInfo *info;
   gboolean dropped;
   gboolean pass;
+  gboolean handled;
   gboolean marshalled;
-  guint cookie;
+
+  gulong *called_probes;
+  guint n_called_probes;
+  guint called_probes_size;
+  gboolean retry;
 } ProbeMarshall;
 
 static void gst_pad_dispose (GObject * object);
@@ -176,6 +192,9 @@ static GstFlowReturn gst_pad_send_event_unchecked (GstPad * pad,
 static GstFlowReturn gst_pad_push_event_unchecked (GstPad * pad,
     GstEvent * event, GstPadProbeType type);
 
+static gboolean activate_mode_internal (GstPad * pad, GstObject * parent,
+    GstPadMode mode, gboolean active);
+
 static guint gst_pad_signals[LAST_SIGNAL] = { 0 };
 
 static GParamSpec *pspec_caps = NULL;
@@ -298,7 +317,8 @@ gst_pad_link_get_name (GstPadLinkReturn ret)
 }
 
 #define gst_pad_parent_class parent_class
-G_DEFINE_TYPE_WITH_CODE (GstPad, gst_pad, GST_TYPE_OBJECT, _do_init);
+G_DEFINE_TYPE_WITH_CODE (GstPad, gst_pad, GST_TYPE_OBJECT,
+    G_ADD_PRIVATE (GstPad) _do_init);
 
 static void
 gst_pad_class_init (GstPadClass * klass)
@@ -309,8 +329,6 @@ gst_pad_class_init (GstPadClass * klass)
   gobject_class = G_OBJECT_CLASS (klass);
   gstobject_class = GST_OBJECT_CLASS (klass);
 
-  g_type_class_add_private (klass, sizeof (GstPadPrivate));
-
   gobject_class->dispose = gst_pad_dispose;
   gobject_class->finalize = gst_pad_finalize;
   gobject_class->set_property = gst_pad_set_property;
@@ -355,6 +373,18 @@ gst_pad_class_init (GstPadClass * klass)
           "The GstPadTemplate of this pad", GST_TYPE_PAD_TEMPLATE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  /**
+   * GstPad:offset:
+   *
+   * The offset that will be applied to the running time of the pad.
+   *
+   * Since: 1.6
+   */
+  g_object_class_install_property (gobject_class, PAD_PROP_OFFSET,
+      g_param_spec_int64 ("offset", "Offset",
+          "The running time offset of the pad", 0, G_MAXINT64, 0,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   gstobject_class->path_string_separator = ".";
 
   /* Register common function pointer descriptions */
@@ -368,7 +398,7 @@ gst_pad_class_init (GstPadClass * klass)
 static void
 gst_pad_init (GstPad * pad)
 {
-  pad->priv = GST_PAD_GET_PRIVATE (pad);
+  pad->priv = gst_pad_get_instance_private (pad);
 
   GST_PAD_DIRECTION (pad) = GST_PAD_UNKNOWN;
 
@@ -389,6 +419,8 @@ gst_pad_init (GstPad * pad)
   pad->priv->events = g_array_sized_new (FALSE, TRUE, sizeof (PadEvent), 16);
   pad->priv->events_cookie = 0;
   pad->priv->last_cookie = -1;
+  g_cond_init (&pad->priv->activation_cond);
+
   pad->ABI.abi.last_flowret = GST_FLOW_FLUSHING;
 }
 
@@ -450,6 +482,8 @@ find_event_by_type (GstPad * pad, GstEventType type, guint idx)
       if (idx == 0)
         goto found;
       idx--;
+    } else if (GST_EVENT_TYPE (ev->event) > type) {
+      break;
     }
   }
   ev = NULL;
@@ -472,6 +506,8 @@ find_event (GstPad * pad, GstEvent * event)
     ev = &g_array_index (events, PadEvent, i);
     if (event == ev->event)
       goto found;
+    else if (GST_EVENT_TYPE (ev->event) > GST_EVENT_TYPE (event))
+      break;
   }
   ev = NULL;
 found:
@@ -495,7 +531,9 @@ remove_event_by_type (GstPad * pad, GstEventType type)
     if (ev->event == NULL)
       goto next;
 
-    if (GST_EVENT_TYPE (ev->event) != type)
+    if (GST_EVENT_TYPE (ev->event) > type)
+      break;
+    else if (GST_EVENT_TYPE (ev->event) != type)
       goto next;
 
     gst_event_unref (ev->event);
@@ -563,7 +601,7 @@ restart:
     if (G_UNLIKELY (ev->event == NULL))
       goto next;
 
-    /* take aditional ref, func might release the lock */
+    /* take additional ref, func might release the lock */
     ev_ret.event = gst_event_ref (ev->event);
     ev_ret.received = ev->received;
 
@@ -606,36 +644,43 @@ restart:
 
 /* should be called with LOCK */
 static GstEvent *
-apply_pad_offset (GstPad * pad, GstEvent * event, gboolean upstream)
+_apply_pad_offset (GstPad * pad, GstEvent * event, gboolean upstream,
+    gint64 pad_offset)
 {
-  /* check if we need to adjust the segment */
-  if (pad->offset != 0) {
-    gint64 offset;
-
-    GST_DEBUG_OBJECT (pad, "apply pad offset %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (pad->offset));
+  gint64 offset;
 
-    if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
-      GstSegment segment;
+  GST_DEBUG_OBJECT (pad, "apply pad offset %" GST_STIME_FORMAT,
+      GST_STIME_ARGS (pad_offset));
 
-      g_assert (!upstream);
+  if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
+    GstSegment segment;
 
-      /* copy segment values */
-      gst_event_copy_segment (event, &segment);
-      gst_event_unref (event);
+    g_assert (!upstream);
 
-      gst_segment_offset_running_time (&segment, segment.format, pad->offset);
-      event = gst_event_new_segment (&segment);
-    }
+    /* copy segment values */
+    gst_event_copy_segment (event, &segment);
+    gst_event_unref (event);
 
-    event = gst_event_make_writable (event);
-    offset = gst_event_get_running_time_offset (event);
-    if (upstream)
-      offset -= pad->offset;
-    else
-      offset += pad->offset;
-    gst_event_set_running_time_offset (event, offset);
+    gst_segment_offset_running_time (&segment, segment.format, pad_offset);
+    event = gst_event_new_segment (&segment);
   }
+
+  event = gst_event_make_writable (event);
+  offset = gst_event_get_running_time_offset (event);
+  if (upstream)
+    offset -= pad_offset;
+  else
+    offset += pad_offset;
+  gst_event_set_running_time_offset (event, offset);
+
+  return event;
+}
+
+static inline GstEvent *
+apply_pad_offset (GstPad * pad, GstEvent * event, gboolean upstream)
+{
+  if (G_UNLIKELY (pad->offset != 0))
+    return _apply_pad_offset (pad, event, upstream, pad->offset);
   return event;
 }
 
@@ -659,7 +704,7 @@ gst_pad_dispose (GObject * object)
   GstPad *pad = GST_PAD_CAST (object);
   GstPad *peer;
 
-  GST_CAT_DEBUG_OBJECT (GST_CAT_REFCOUNTING, pad, "dispose");
+  GST_CAT_DEBUG_OBJECT (GST_CAT_REFCOUNTING, pad, "%p dispose", pad);
 
   /* unlink the peer pad */
   if ((peer = gst_pad_get_peer (pad))) {
@@ -721,6 +766,7 @@ gst_pad_finalize (GObject * object)
 
   g_rec_mutex_clear (&pad->stream_rec_lock);
   g_cond_clear (&pad->block_cond);
+  g_cond_clear (&pad->priv->activation_cond);
   g_array_free (pad->priv->events, TRUE);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
@@ -740,6 +786,9 @@ gst_pad_set_property (GObject * object, guint prop_id,
       gst_pad_set_pad_template (GST_PAD_CAST (object),
           (GstPadTemplate *) g_value_get_object (value));
       break;
+    case PAD_PROP_OFFSET:
+      gst_pad_set_offset (GST_PAD_CAST (object), g_value_get_int64 (value));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -764,6 +813,9 @@ gst_pad_get_property (GObject * object, guint prop_id,
     case PAD_PROP_TEMPLATE:
       g_value_set_object (value, GST_PAD_PAD_TEMPLATE (object));
       break;
+    case PAD_PROP_OFFSET:
+      g_value_set_int64 (value, gst_pad_get_offset (GST_PAD_CAST (object)));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -780,8 +832,7 @@ gst_pad_get_property (GObject * object, guint prop_id,
  * will be assigned.
  * This function makes a copy of the name so you can safely free the name.
  *
- * Returns: (transfer floating) (nullable): a new #GstPad, or %NULL in
- * case of an error.
+ * Returns: (transfer floating): a new #GstPad.
  *
  * MT safe.
  */
@@ -802,15 +853,18 @@ gst_pad_new (const gchar * name, GstPadDirection direction)
  * will be assigned.
  * This function makes a copy of the name so you can safely free the name.
  *
- * Returns: (transfer floating) (nullable): a new #GstPad, or %NULL in
- * case of an error.
+ * Returns: (transfer floating): a new #GstPad.
  */
 GstPad *
 gst_pad_new_from_template (GstPadTemplate * templ, const gchar * name)
 {
+  GType pad_type =
+      GST_PAD_TEMPLATE_GTYPE (templ) ==
+      G_TYPE_NONE ? GST_TYPE_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
+
   g_return_val_if_fail (GST_IS_PAD_TEMPLATE (templ), NULL);
 
-  return g_object_new (GST_TYPE_PAD,
+  return g_object_new (pad_type,
       "name", name, "direction", templ->direction, "template", templ, NULL);
 }
 
@@ -824,8 +878,7 @@ gst_pad_new_from_template (GstPadTemplate * templ, const gchar * name)
  * will be assigned.
  * This function makes a copy of the name so you can safely free the name.
  *
- * Returns: (transfer floating) (nullable): a new #GstPad, or %NULL in
- * case of an error.
+ * Returns: (transfer floating): a new #GstPad.
  */
 GstPad *
 gst_pad_new_from_static_template (GstStaticPadTemplate * templ,
@@ -883,7 +936,9 @@ gst_pad_get_direction (GstPad * pad)
 static gboolean
 gst_pad_activate_default (GstPad * pad, GstObject * parent)
 {
-  return gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE);
+  g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
+
+  return activate_mode_internal (pad, parent, GST_PAD_MODE_PUSH, TRUE);
 }
 
 /**
@@ -910,12 +965,22 @@ gst_pad_mode_get_name (GstPadMode mode)
   return "unknown";
 }
 
-static void
+/* Returns TRUE if pad wasn't already in the new_mode */
+static gboolean
 pre_activate (GstPad * pad, GstPadMode new_mode)
 {
   switch (new_mode) {
     case GST_PAD_MODE_NONE:
       GST_OBJECT_LOCK (pad);
+      while (G_UNLIKELY (pad->priv->in_activation))
+        g_cond_wait (&pad->priv->activation_cond, GST_OBJECT_GET_LOCK (pad));
+      if (new_mode == GST_PAD_MODE (pad)) {
+        GST_WARNING_OBJECT (pad,
+            "Pad is already in the process of being deactivated");
+        GST_OBJECT_UNLOCK (pad);
+        return FALSE;
+      }
+      pad->priv->in_activation = TRUE;
       GST_DEBUG_OBJECT (pad, "setting PAD_MODE NONE, set flushing");
       GST_PAD_SET_FLUSHING (pad);
       pad->ABI.abi.last_flowret = GST_FLOW_FLUSHING;
@@ -927,6 +992,15 @@ pre_activate (GstPad * pad, GstPadMode new_mode)
     case GST_PAD_MODE_PUSH:
     case GST_PAD_MODE_PULL:
       GST_OBJECT_LOCK (pad);
+      while (G_UNLIKELY (pad->priv->in_activation))
+        g_cond_wait (&pad->priv->activation_cond, GST_OBJECT_GET_LOCK (pad));
+      if (new_mode == GST_PAD_MODE (pad)) {
+        GST_WARNING_OBJECT (pad,
+            "Pad is already in the process of being activated");
+        GST_OBJECT_UNLOCK (pad);
+        return FALSE;
+      }
+      pad->priv->in_activation = TRUE;
       GST_DEBUG_OBJECT (pad, "setting pad into %s mode, unset flushing",
           gst_pad_mode_get_name (new_mode));
       GST_PAD_UNSET_FLUSHING (pad);
@@ -955,6 +1029,7 @@ pre_activate (GstPad * pad, GstPadMode new_mode)
       }
       break;
   }
+  return TRUE;
 }
 
 static void
@@ -962,6 +1037,11 @@ post_activate (GstPad * pad, GstPadMode new_mode)
 {
   switch (new_mode) {
     case GST_PAD_MODE_NONE:
+      GST_OBJECT_LOCK (pad);
+      pad->priv->in_activation = FALSE;
+      g_cond_broadcast (&pad->priv->activation_cond);
+      GST_OBJECT_UNLOCK (pad);
+
       /* ensures that streaming stops */
       GST_PAD_STREAM_LOCK (pad);
       GST_DEBUG_OBJECT (pad, "stopped streaming");
@@ -972,6 +1052,10 @@ post_activate (GstPad * pad, GstPadMode new_mode)
       break;
     case GST_PAD_MODE_PUSH:
     case GST_PAD_MODE_PULL:
+      GST_OBJECT_LOCK (pad);
+      pad->priv->in_activation = FALSE;
+      g_cond_broadcast (&pad->priv->activation_cond);
+      GST_OBJECT_UNLOCK (pad);
       /* NOP */
       break;
   }
@@ -1028,7 +1112,7 @@ gst_pad_set_active (GstPad * pad, gboolean active)
     } else {
       GST_DEBUG_OBJECT (pad, "deactivating pad from %s mode",
           gst_pad_mode_get_name (old));
-      ret = gst_pad_activate_mode (pad, old, FALSE);
+      ret = activate_mode_internal (pad, parent, old, FALSE);
       if (ret)
         pad->ABI.abi.last_flowret = GST_FLOW_FLUSHING;
     }
@@ -1062,36 +1146,18 @@ failed:
   }
 }
 
-/**
- * gst_pad_activate_mode:
- * @pad: the #GstPad to activate or deactivate.
- * @mode: the requested activation mode
- * @active: whether or not the pad should be active.
- *
- * Activates or deactivates the given pad in @mode via dispatching to the
- * pad's activatemodefunc. For use from within pad activation functions only.
- *
- * If you don't know what this is, you probably don't want to call it.
- *
- * Returns: %TRUE if the operation was successful.
- *
- * MT safe.
- */
-gboolean
-gst_pad_activate_mode (GstPad * pad, GstPadMode mode, gboolean active)
+static gboolean
+activate_mode_internal (GstPad * pad, GstObject * parent, GstPadMode mode,
+    gboolean active)
 {
   gboolean res = FALSE;
-  GstObject *parent;
   GstPadMode old, new;
   GstPadDirection dir;
   GstPad *peer;
 
-  g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
-
   GST_OBJECT_LOCK (pad);
   old = GST_PAD_MODE (pad);
   dir = GST_PAD_DIRECTION (pad);
-  ACQUIRE_PARENT (pad, parent, no_parent);
   GST_OBJECT_UNLOCK (pad);
 
   new = active ? mode : GST_PAD_MODE_NONE;
@@ -1105,8 +1171,9 @@ gst_pad_activate_mode (GstPad * pad, GstPadMode mode, gboolean active)
     GST_DEBUG_OBJECT (pad, "deactivating pad from %s mode",
         gst_pad_mode_get_name (old));
 
-    if (G_UNLIKELY (!gst_pad_activate_mode (pad, old, FALSE)))
+    if (G_UNLIKELY (!activate_mode_internal (pad, parent, old, FALSE)))
       goto deactivate_failed;
+    old = GST_PAD_MODE_NONE;
   }
 
   switch (mode) {
@@ -1141,17 +1208,21 @@ gst_pad_activate_mode (GstPad * pad, GstPadMode mode, gboolean active)
   /* Mark pad as needing reconfiguration */
   if (active)
     GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_NEED_RECONFIGURE);
-  pre_activate (pad, new);
 
-  if (GST_PAD_ACTIVATEMODEFUNC (pad)) {
-    if (G_UNLIKELY (!GST_PAD_ACTIVATEMODEFUNC (pad) (pad, parent, mode,
-                active)))
-      goto failure;
-  } else {
-    /* can happen for sinks of passthrough elements */
-  }
+  /* pre_activate returns TRUE if we weren't already in the process of
+   * switching to the 'new' mode */
+  if (pre_activate (pad, new)) {
+
+    if (GST_PAD_ACTIVATEMODEFUNC (pad)) {
+      if (G_UNLIKELY (!GST_PAD_ACTIVATEMODEFUNC (pad) (pad, parent, mode,
+                  active)))
+        goto failure;
+    } else {
+      /* can happen for sinks of passthrough elements */
+    }
 
-  post_activate (pad, new);
+    post_activate (pad, new);
+  }
 
   GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, "%s in %s mode",
       active ? "activated" : "deactivated", gst_pad_mode_get_name (mode));
@@ -1168,16 +1239,8 @@ exit_success:
   }
 
 exit:
-  RELEASE_PARENT (parent);
-
   return res;
 
-no_parent:
-  {
-    GST_DEBUG_OBJECT (pad, "no parent");
-    GST_OBJECT_UNLOCK (pad);
-    return FALSE;
-  }
 was_ok:
   {
     GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, "already %s in %s mode",
@@ -1214,12 +1277,70 @@ failure:
         active ? "activate" : "deactivate", gst_pad_mode_get_name (mode));
     GST_PAD_SET_FLUSHING (pad);
     GST_PAD_MODE (pad) = old;
+    pad->priv->in_activation = FALSE;
+    g_cond_broadcast (&pad->priv->activation_cond);
     GST_OBJECT_UNLOCK (pad);
     goto exit;
   }
 }
 
 /**
+ * gst_pad_activate_mode:
+ * @pad: the #GstPad to activate or deactivate.
+ * @mode: the requested activation mode
+ * @active: whether or not the pad should be active.
+ *
+ * Activates or deactivates the given pad in @mode via dispatching to the
+ * pad's activatemodefunc. For use from within pad activation functions only.
+ *
+ * If you don't know what this is, you probably don't want to call it.
+ *
+ * Returns: %TRUE if the operation was successful.
+ *
+ * MT safe.
+ */
+gboolean
+gst_pad_activate_mode (GstPad * pad, GstPadMode mode, gboolean active)
+{
+  GstObject *parent;
+  gboolean res;
+  GstPadMode old, new;
+
+  g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
+
+  GST_OBJECT_LOCK (pad);
+
+  old = GST_PAD_MODE (pad);
+  new = active ? mode : GST_PAD_MODE_NONE;
+  if (old == new)
+    goto was_ok;
+
+  ACQUIRE_PARENT (pad, parent, no_parent);
+
+  GST_OBJECT_UNLOCK (pad);
+
+  res = activate_mode_internal (pad, parent, mode, active);
+
+  RELEASE_PARENT (parent);
+
+  return res;
+
+was_ok:
+  {
+    GST_OBJECT_UNLOCK (pad);
+    GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, "already %s in %s mode",
+        active ? "activated" : "deactivated", gst_pad_mode_get_name (mode));
+    return TRUE;
+  }
+no_parent:
+  {
+    GST_WARNING_OBJECT (pad, "no parent");
+    GST_OBJECT_UNLOCK (pad);
+    return FALSE;
+  }
+}
+
+/**
  * gst_pad_is_active:
  * @pad: the #GstPad to query
  *
@@ -1248,6 +1369,9 @@ cleanup_hook (GstPad * pad, GHook * hook)
 {
   GstPadProbeType type;
 
+  GST_DEBUG_OBJECT (pad,
+      "cleaning up hook %lu with flags %08x", hook->hook_id, hook->flags);
+
   if (!G_HOOK_IS_VALID (hook))
     return;
 
@@ -1283,9 +1407,16 @@ cleanup_hook (GstPad * pad, GHook * hook)
  * Be notified of different states of pads. The provided callback is called for
  * every state that matches @mask.
  *
+ * Probes are called in groups: First GST_PAD_PROBE_TYPE_BLOCK probes are
+ * called, then others, then finally GST_PAD_PROBE_TYPE_IDLE. The only
+ * exception here are GST_PAD_PROBE_TYPE_IDLE probes that are called
+ * immediately if the pad is already idle while calling gst_pad_add_probe().
+ * In each of the groups, probes are called in the order in which they were
+ * added.
+ *
  * Returns: an id or 0 if no probe is pending. The id can be used to remove the
  * probe with gst_pad_remove_probe(). When using GST_PAD_PROBE_TYPE_IDLE it can
- * happend that the probe can be run immediately and if the probe returns
+ * happen that the probe can be run immediately and if the probe returns
  * GST_PAD_PROBE_REMOVE this functions returns 0.
  *
  * MT safe.
@@ -1309,9 +1440,9 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask,
   GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "adding probe for mask 0x%08x",
       mask);
 
-  /* when no contraints are given for the types, assume all types are
+  /* when no constraints are given for the types, assume all types are
    * acceptable */
-  if ((mask & GST_PAD_PROBE_TYPE_ALL_BOTH) == 0)
+  if ((mask & _PAD_PROBE_TYPE_ALL_BOTH_AND_FLUSH) == 0)
     mask |= GST_PAD_PROBE_TYPE_ALL_BOTH;
   if ((mask & GST_PAD_PROBE_TYPE_SCHEDULING) == 0)
     mask |= GST_PAD_PROBE_TYPE_SCHEDULING;
@@ -1321,12 +1452,11 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask,
   hook->func = callback;
   hook->data = user_data;
   hook->destroy = destroy_data;
-  PROBE_COOKIE (hook) = (pad->priv->probe_cookie - 1);
 
   /* add the probe */
-  g_hook_prepend (&pad->probes, hook);
+  g_hook_append (&pad->probes, hook);
   pad->num_probes++;
-  /* incremenent cookie so that the new hook get's called */
+  /* incremenent cookie so that the new hook gets called */
   pad->priv->probe_list_cookie++;
 
   /* get the id of the hook, we return this and it can be used to remove the
@@ -1361,6 +1491,7 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask,
 
       /* Keep another ref, the callback could destroy the pad */
       gst_object_ref (pad);
+      pad->priv->idle_running++;
 
       /* the pad is idle now, we can signal the idle callback now */
       GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
@@ -1386,10 +1517,17 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask,
         case GST_PAD_PROBE_OK:
           GST_DEBUG_OBJECT (pad, "probe returned OK");
           break;
+        case GST_PAD_PROBE_HANDLED:
+          GST_DEBUG_OBJECT (pad, "probe handled the data");
+          break;
         default:
           GST_DEBUG_OBJECT (pad, "probe returned %d", ret);
           break;
       }
+      pad->priv->idle_running--;
+      if (pad->priv->idle_running == 0) {
+        GST_PAD_BLOCK_BROADCAST (pad);
+      }
       GST_OBJECT_UNLOCK (pad);
 
       gst_object_unref (pad);
@@ -1770,6 +1908,53 @@ gst_pad_set_event_function_full (GstPad * pad, GstPadEventFunction event,
       GST_DEBUG_FUNCPTR_NAME (event));
 }
 
+static gboolean
+event_wrap (GstPad * pad, GstObject * object, GstEvent * event)
+{
+  GstFlowReturn ret;
+
+  ret = GST_PAD_EVENTFULLFUNC (pad) (pad, object, event);
+  if (ret == GST_FLOW_OK)
+    return TRUE;
+  return FALSE;
+}
+
+/**
+ * gst_pad_set_event_full_function:
+ * @p: a #GstPad of either direction.
+ * @f: the #GstPadEventFullFunction to set.
+ *
+ * Calls gst_pad_set_event_full_function_full() with %NULL for the user_data and
+ * notify.
+ */
+/**
+ * gst_pad_set_event_full_function_full:
+ * @pad: a #GstPad of either direction.
+ * @event: the #GstPadEventFullFunction to set.
+ * @user_data: user_data passed to @notify
+ * @notify: notify called when @event will not be used anymore.
+ *
+ * Sets the given event handler for the pad.
+ *
+ * Since: 1.8
+ */
+void
+gst_pad_set_event_full_function_full (GstPad * pad,
+    GstPadEventFullFunction event, gpointer user_data, GDestroyNotify notify)
+{
+  g_return_if_fail (GST_IS_PAD (pad));
+
+  if (pad->eventnotify)
+    pad->eventnotify (pad->eventdata);
+  GST_PAD_EVENTFULLFUNC (pad) = event;
+  GST_PAD_EVENTFUNC (pad) = event_wrap;
+  pad->eventdata = user_data;
+  pad->eventnotify = notify;
+
+  GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, "eventfullfunc for set to %s",
+      GST_DEBUG_FUNCPTR_NAME (event));
+}
+
 /**
  * gst_pad_set_query_function:
  * @p: a #GstPad of either direction.
@@ -1897,6 +2082,10 @@ gst_pad_set_link_function_full (GstPad * pad, GstPadLinkFunction link,
  *
  * Sets the given unlink function for the pad. It will be called
  * when the pad is unlinked.
+ *
+ * Note that the pad's lock is already held when the unlink
+ * function is called, so most pad functions cannot be called
+ * from within the callback.
  */
 void
 gst_pad_set_unlink_function_full (GstPad * pad, GstPadUnlinkFunction unlink,
@@ -1938,6 +2127,8 @@ gst_pad_unlink (GstPad * srcpad, GstPad * sinkpad)
   g_return_val_if_fail (GST_IS_PAD (sinkpad), FALSE);
   g_return_val_if_fail (GST_PAD_IS_SINK (sinkpad), FALSE);
 
+  GST_TRACER_PAD_UNLINK_PRE (srcpad, sinkpad);
+
   GST_CAT_INFO (GST_CAT_ELEMENT_PADS, "unlinking %s:%s(%p) and %s:%s(%p)",
       GST_DEBUG_PAD_NAME (srcpad), srcpad,
       GST_DEBUG_PAD_NAME (sinkpad), sinkpad);
@@ -2005,6 +2196,7 @@ done:
             GST_STRUCTURE_CHANGE_TYPE_PAD_UNLINK, parent, FALSE));
     gst_object_unref (parent);
   }
+  GST_TRACER_PAD_UNLINK_POST (srcpad, sinkpad, result);
   return result;
 
   /* ERRORS */
@@ -2314,6 +2506,8 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags)
   g_return_val_if_fail (GST_PAD_IS_SINK (sinkpad),
       GST_PAD_LINK_WRONG_DIRECTION);
 
+  GST_TRACER_PAD_LINK_PRE (srcpad, sinkpad);
+
   /* Notify the parent early. See gst_pad_unlink for details. */
   if (G_LIKELY ((parent = GST_ELEMENT_CAST (gst_pad_get_parent (srcpad))))) {
     if (G_LIKELY (GST_IS_ELEMENT (parent))) {
@@ -2329,8 +2523,12 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags)
   /* prepare will also lock the two pads */
   result = gst_pad_link_prepare (srcpad, sinkpad, flags);
 
-  if (G_UNLIKELY (result != GST_PAD_LINK_OK))
+  if (G_UNLIKELY (result != GST_PAD_LINK_OK)) {
+    GST_CAT_INFO (GST_CAT_PADS, "link between %s:%s and %s:%s failed: %s",
+        GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad),
+        gst_pad_link_get_name (result));
     goto done;
+  }
 
   /* must set peers before calling the link function */
   GST_PAD_PEER (srcpad) = sinkpad;
@@ -2387,7 +2585,8 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags)
   GST_CAT_INFO (GST_CAT_PADS, "linked %s:%s and %s:%s, successful",
       GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
 
-  gst_pad_send_event (srcpad, gst_event_new_reconfigure ());
+  if (!(flags & GST_PAD_LINK_CHECK_NO_RECONFIGURE))
+    gst_pad_send_event (srcpad, gst_event_new_reconfigure ());
 
 done:
   if (G_LIKELY (parent)) {
@@ -2397,6 +2596,7 @@ done:
     gst_object_unref (parent);
   }
 
+  GST_TRACER_PAD_LINK_POST (srcpad, sinkpad, result);
   return result;
 
   /* ERRORS */
@@ -2516,7 +2716,8 @@ gst_pad_has_current_caps (GstPad * pad)
  * Gets the capabilities currently configured on @pad with the last
  * #GST_EVENT_CAPS event.
  *
- * Returns: the current caps of the pad with incremented ref-count.
+ * Returns: (transfer full) (nullable): the current caps of the pad with
+ * incremented ref-count or %NULL when pad has no caps. Unref after usage.
  */
 GstCaps *
 gst_pad_get_current_caps (GstPad * pad)
@@ -2562,7 +2763,7 @@ gst_pad_get_pad_template_caps (GstPad * pad)
  * Gets the peer of @pad. This function refs the peer pad so
  * you need to unref it after use.
  *
- * Returns: (transfer full): the peer #GstPad. Unref after usage.
+ * Returns: (transfer full) (nullable): the peer #GstPad. Unref after usage.
  *
  * MT safe.
  */
@@ -2603,30 +2804,41 @@ GstCaps *
 gst_pad_get_allowed_caps (GstPad * pad)
 {
   GstCaps *mycaps;
-  GstCaps *caps;
-  GstPad *peer;
+  GstCaps *caps = NULL;
+  GstQuery *query;
 
   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
 
   GST_OBJECT_LOCK (pad);
-  peer = GST_PAD_PEER (pad);
-  if (G_UNLIKELY (peer == NULL))
+  if (G_UNLIKELY (GST_PAD_PEER (pad) == NULL))
     goto no_peer;
+  GST_OBJECT_UNLOCK (pad);
 
   GST_CAT_DEBUG_OBJECT (GST_CAT_PROPERTIES, pad, "getting allowed caps");
 
-  gst_object_ref (peer);
-  GST_OBJECT_UNLOCK (pad);
   mycaps = gst_pad_query_caps (pad, NULL);
 
-  caps = gst_pad_query_caps (peer, mycaps);
-  gst_object_unref (peer);
+  /* Query peer caps */
+  query = gst_query_new_caps (mycaps);
+  if (!gst_pad_peer_query (pad, query)) {
+    GST_CAT_DEBUG_OBJECT (GST_CAT_CAPS, pad, "Caps query failed");
+    goto end;
+  }
 
-  gst_caps_unref (mycaps);
+  gst_query_parse_caps_result (query, &caps);
+  if (caps == NULL) {
+    g_warn_if_fail (caps != NULL);
+    goto end;
+  }
+  gst_caps_ref (caps);
 
   GST_CAT_DEBUG_OBJECT (GST_CAT_CAPS, pad, "allowed caps %" GST_PTR_FORMAT,
       caps);
 
+end:
+  gst_query_unref (query);
+  gst_caps_unref (mycaps);
+
   return caps;
 
 no_peer:
@@ -2744,7 +2956,7 @@ no_parent:
   {
     GST_DEBUG_OBJECT (pad, "no parent");
     GST_OBJECT_UNLOCK (pad);
-    return FALSE;
+    return NULL;
   }
 }
 
@@ -2858,7 +3070,7 @@ event_forward_func (GstPad * pad, EventData * data)
  * The EOS event will pause the task associated with @pad before it is forwarded
  * to all internally linked pads,
  *
- * The the event is sent to all pads internally linked to @pad. This function
+ * The event is sent to all pads internally linked to @pad. This function
  * takes ownership of @event.
  *
  * Returns: %TRUE if the event was sent successfully.
@@ -2911,7 +3123,7 @@ static gboolean
 gst_pad_query_accept_caps_default (GstPad * pad, GstQuery * query)
 {
   /* get the caps and see if it intersects to something not empty */
-  GstCaps *caps, *allowed;
+  GstCaps *caps, *allowed = NULL;
   gboolean result;
 
   GST_DEBUG_OBJECT (pad, "query accept-caps %" GST_PTR_FORMAT, query);
@@ -2919,16 +3131,23 @@ gst_pad_query_accept_caps_default (GstPad * pad, GstQuery * query)
   /* first forward the query to internally linked pads when we are dealing with
    * a PROXY CAPS */
   if (GST_PAD_IS_PROXY_CAPS (pad)) {
-    if ((result = gst_pad_proxy_query_accept_caps (pad, query))) {
+    result = gst_pad_proxy_query_accept_caps (pad, query);
+    if (result)
+      allowed = gst_pad_get_pad_template_caps (pad);
+    else
       goto done;
-    }
   }
 
-  GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, pad,
-      "fallback ACCEPT_CAPS query, consider implementing a specialized version");
-
   gst_query_parse_accept_caps (query, &caps);
-  allowed = gst_pad_query_caps (pad, caps);
+  if (!allowed) {
+    if (GST_PAD_IS_ACCEPT_TEMPLATE (pad)) {
+      allowed = gst_pad_get_pad_template_caps (pad);
+    } else {
+      GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, pad,
+          "fallback ACCEPT_CAPS query, consider implementing a specialized version");
+      allowed = gst_pad_query_caps (pad, caps);
+    }
+  }
 
   if (allowed) {
     if (GST_PAD_IS_ACCEPT_INTERSECT (pad)) {
@@ -3024,6 +3243,123 @@ done:
   return TRUE;
 }
 
+/* Default latency implementation */
+typedef struct
+{
+  gboolean live;
+  GstClockTime min, max;
+} LatencyFoldData;
+
+static gboolean
+query_latency_default_fold (const GValue * item, GValue * ret,
+    gpointer user_data)
+{
+  GstPad *pad = g_value_get_object (item), *peer;
+  LatencyFoldData *fold_data = user_data;
+  GstQuery *query;
+  gboolean res = FALSE;
+
+  query = gst_query_new_latency ();
+
+  peer = gst_pad_get_peer (pad);
+  if (peer) {
+    res = gst_pad_peer_query (pad, query);
+  } else {
+    GST_LOG_OBJECT (pad, "No peer pad found, ignoring this pad");
+  }
+
+  if (res) {
+    gboolean live;
+    GstClockTime min, max;
+
+    gst_query_parse_latency (query, &live, &min, &max);
+
+    GST_LOG_OBJECT (pad, "got latency live:%s min:%" G_GINT64_FORMAT
+        " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
+
+    if (live) {
+      if (min > fold_data->min)
+        fold_data->min = min;
+
+      if (fold_data->max == GST_CLOCK_TIME_NONE)
+        fold_data->max = max;
+      else if (max < fold_data->max)
+        fold_data->max = max;
+
+      fold_data->live = TRUE;
+    }
+  } else if (peer) {
+    GST_DEBUG_OBJECT (pad, "latency query failed");
+    g_value_set_boolean (ret, FALSE);
+  }
+
+  gst_query_unref (query);
+  if (peer)
+    gst_object_unref (peer);
+
+  return TRUE;
+}
+
+static gboolean
+gst_pad_query_latency_default (GstPad * pad, GstQuery * query)
+{
+  GstIterator *it;
+  GstIteratorResult res;
+  GValue ret = G_VALUE_INIT;
+  gboolean query_ret;
+  LatencyFoldData fold_data;
+
+  it = gst_pad_iterate_internal_links (pad);
+  if (!it) {
+    GST_DEBUG_OBJECT (pad, "Can't iterate internal links");
+    return FALSE;
+  }
+
+  g_value_init (&ret, G_TYPE_BOOLEAN);
+
+retry:
+  fold_data.live = FALSE;
+  fold_data.min = 0;
+  fold_data.max = GST_CLOCK_TIME_NONE;
+
+  g_value_set_boolean (&ret, TRUE);
+  res = gst_iterator_fold (it, query_latency_default_fold, &ret, &fold_data);
+  switch (res) {
+    case GST_ITERATOR_OK:
+      g_assert_not_reached ();
+      break;
+    case GST_ITERATOR_DONE:
+      break;
+    case GST_ITERATOR_ERROR:
+      g_value_set_boolean (&ret, FALSE);
+      break;
+    case GST_ITERATOR_RESYNC:
+      gst_iterator_resync (it);
+      goto retry;
+    default:
+      g_assert_not_reached ();
+      break;
+  }
+  gst_iterator_free (it);
+
+  query_ret = g_value_get_boolean (&ret);
+  if (query_ret) {
+    GST_LOG_OBJECT (pad, "got latency live:%s min:%" G_GINT64_FORMAT
+        " max:%" G_GINT64_FORMAT, fold_data.live ? "true" : "false",
+        fold_data.min, fold_data.max);
+
+    if (fold_data.min > fold_data.max) {
+      GST_ERROR_OBJECT (pad, "minimum latency bigger than maximum latency");
+    }
+
+    gst_query_set_latency (query, fold_data.live, fold_data.min, fold_data.max);
+  } else {
+    GST_LOG_OBJECT (pad, "latency query failed");
+  }
+
+  return query_ret;
+}
+
 typedef struct
 {
   GstQuery *query;
@@ -3079,10 +3415,17 @@ gst_pad_query_default (GstPad * pad, GstObject * parent, GstQuery * query)
       ret = gst_pad_query_caps_default (pad, query);
       forward = FALSE;
       break;
+    case GST_QUERY_LATENCY:
+      ret = gst_pad_query_latency_default (pad, query);
+      forward = FALSE;
+      break;
+    case GST_QUERY_BITRATE:
+      /* FIXME: better default handling */
+      forward = TRUE;
+      break;
     case GST_QUERY_POSITION:
     case GST_QUERY_SEEKING:
     case GST_QUERY_FORMATS:
-    case GST_QUERY_LATENCY:
     case GST_QUERY_JITTER:
     case GST_QUERY_RATE:
     case GST_QUERY_CONVERT:
@@ -3116,6 +3459,8 @@ gst_pad_query_default (GstPad * pad, GstObject * parent, GstQuery * query)
   return ret;
 }
 
+#define N_STACK_ALLOCATE_PROBES (16)
+
 static void
 probe_hook_marshal (GHook * hook, ProbeMarshall * data)
 {
@@ -3124,27 +3469,71 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
   GstPadProbeType type, flags;
   GstPadProbeCallback callback;
   GstPadProbeReturn ret;
-
-  /* if we have called this callback, do nothing */
-  if (PROBE_COOKIE (hook) == data->cookie) {
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-        "hook %lu, cookie %u already called", hook->hook_id,
-        PROBE_COOKIE (hook));
-    return;
+  gpointer original_data;
+  guint i;
+
+  /* if we have called this callback, do nothing. But only check
+   * if we're actually calling probes a second time */
+  if (data->retry) {
+    for (i = 0; i < data->n_called_probes; i++) {
+      if (data->called_probes[i] == hook->hook_id) {
+        GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+            "hook %lu already called", hook->hook_id);
+        return;
+      }
+    }
   }
 
-  PROBE_COOKIE (hook) = data->cookie;
+  /* reallocate on the heap if we had more than 16 probes */
+  if (data->n_called_probes == data->called_probes_size) {
+    if (data->called_probes_size > N_STACK_ALLOCATE_PROBES) {
+      data->called_probes_size *= 2;
+      data->called_probes =
+          g_renew (gulong, data->called_probes, data->called_probes_size);
+    } else {
+      gulong *tmp = data->called_probes;
+
+      data->called_probes_size *= 2;
+      data->called_probes = g_new (gulong, data->called_probes_size);
+      memcpy (data->called_probes, tmp,
+          N_STACK_ALLOCATE_PROBES * sizeof (gulong));
+    }
+  }
+  data->called_probes[data->n_called_probes++] = hook->hook_id;
 
   flags = hook->flags >> G_HOOK_FLAG_USER_SHIFT;
   type = info->type;
+  original_data = info->data;
 
-  /* one of the data types for non-idle probes */
-  if ((type & GST_PAD_PROBE_TYPE_IDLE) == 0
-      && (flags & GST_PAD_PROBE_TYPE_ALL_BOTH & type) == 0)
-    goto no_match;
   /* one of the scheduling types */
   if ((flags & GST_PAD_PROBE_TYPE_SCHEDULING & type) == 0)
     goto no_match;
+
+  if (G_UNLIKELY (data->handled)) {
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+        "probe previously returned HANDLED, not calling again");
+    goto no_match;
+  } else if (G_UNLIKELY (data->dropped)) {
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+        "probe previously returned DROPPED, not calling again");
+    goto no_match;
+  }
+
+  if (type & GST_PAD_PROBE_TYPE_PUSH) {
+    /* one of the data types for non-idle probes */
+    if ((type & GST_PAD_PROBE_TYPE_IDLE) == 0
+        && (flags & _PAD_PROBE_TYPE_ALL_BOTH_AND_FLUSH & type) == 0)
+      goto no_match;
+  } else if (type & GST_PAD_PROBE_TYPE_PULL) {
+    /* one of the data types for non-idle probes */
+    if ((type & GST_PAD_PROBE_TYPE_BLOCKING) == 0
+        && (flags & _PAD_PROBE_TYPE_ALL_BOTH_AND_FLUSH & type) == 0)
+      goto no_match;
+  } else {
+    /* Type must have PULL or PUSH probe types */
+    g_assert_not_reached ();
+  }
+
   /* one of the blocking types must match */
   if ((type & GST_PAD_PROBE_TYPE_BLOCKING) &&
       (flags & GST_PAD_PROBE_TYPE_BLOCKING & type) == 0)
@@ -3158,8 +3547,7 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
     goto no_match;
 
   GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-      "hook %lu, cookie %u with flags 0x%08x matches", hook->hook_id,
-      PROBE_COOKIE (hook), flags);
+      "hook %lu with flags 0x%08x matches", hook->hook_id, flags);
 
   data->marshalled = TRUE;
 
@@ -3169,12 +3557,25 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
 
   info->id = hook->hook_id;
 
+  if ((flags & GST_PAD_PROBE_TYPE_IDLE))
+    pad->priv->idle_running++;
+
   GST_OBJECT_UNLOCK (pad);
 
   ret = callback (pad, info, hook->data);
 
   GST_OBJECT_LOCK (pad);
 
+  if ((flags & GST_PAD_PROBE_TYPE_IDLE))
+    pad->priv->idle_running--;
+
+  if (ret != GST_PAD_PROBE_HANDLED && original_data != NULL
+      && info->data == NULL) {
+    GST_DEBUG_OBJECT (pad, "data item in pad probe info was dropped");
+    info->type = GST_PAD_PROBE_TYPE_INVALID;
+    data->dropped = TRUE;
+  }
+
   switch (ret) {
     case GST_PAD_PROBE_REMOVE:
       /* remove the probe */
@@ -3188,6 +3589,10 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
       info->type = GST_PAD_PROBE_TYPE_INVALID;
       data->dropped = TRUE;
       break;
+    case GST_PAD_PROBE_HANDLED:
+      GST_DEBUG_OBJECT (pad, "probe handled data");
+      data->handled = TRUE;
+      break;
     case GST_PAD_PROBE_PASS:
       /* inform the pad block to let things pass */
       GST_DEBUG_OBJECT (pad, "asked to pass item");
@@ -3205,8 +3610,8 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
 no_match:
   {
     GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-        "hook %lu, cookie %u with flags 0x%08x does not match %08x",
-        hook->hook_id, PROBE_COOKIE (hook), flags, info->type);
+        "hook %lu with flags 0x%08x does not match %08x",
+        hook->hook_id, flags, info->type);
     return;
   }
 }
@@ -3215,32 +3620,74 @@ no_match:
 #define PROBE_NO_DATA(pad,mask,label,defaultval)                \
   G_STMT_START {                                               \
     if (G_UNLIKELY (pad->num_probes)) {                                \
+      GstFlowReturn pval = defaultval;                         \
       /* pass NULL as the data item */                          \
-      GstPadProbeInfo info = { mask, 0, NULL, 0, 0 };           \
+      GstPadProbeInfo info = { mask, 0, NULL, 0, 0 };          \
+      info.ABI.abi.flow_ret = defaultval;                      \
       ret = do_probe_callbacks (pad, &info, defaultval);       \
-      if (G_UNLIKELY (ret != defaultval && ret != GST_FLOW_OK))        \
+      if (G_UNLIKELY (ret != pval && ret != GST_FLOW_OK))      \
         goto label;                                            \
     }                                                          \
   } G_STMT_END
 
-#define PROBE_FULL(pad,mask,data,offs,size,label)               \
-  G_STMT_START {                                               \
-    if (G_UNLIKELY (pad->num_probes)) {                                \
-      /* pass the data item */                                  \
-      GstPadProbeInfo info = { mask, 0, data, offs, size };     \
-      ret = do_probe_callbacks (pad, &info, GST_FLOW_OK);      \
-      /* store the possibly updated data item */                \
-      data = GST_PAD_PROBE_INFO_DATA (&info);                   \
-      /* if something went wrong, exit */                       \
-      if (G_UNLIKELY (ret != GST_FLOW_OK))                     \
-        goto label;                                            \
-    }                                                          \
+#define PROBE_FULL(pad,mask,data,offs,size,label,handleable,handle_label) \
+  G_STMT_START {                                                       \
+    if (G_UNLIKELY (pad->num_probes)) {                                        \
+      /* pass the data item */                                         \
+      GstPadProbeInfo info = { mask, 0, data, offs, size };            \
+      info.ABI.abi.flow_ret = GST_FLOW_OK;                             \
+      ret = do_probe_callbacks (pad, &info, GST_FLOW_OK);              \
+      /* store the possibly updated data item */                       \
+      data = GST_PAD_PROBE_INFO_DATA (&info);                          \
+      /* if something went wrong, exit */                              \
+      if (G_UNLIKELY (ret != GST_FLOW_OK)) {                           \
+       if (handleable && ret == GST_FLOW_CUSTOM_SUCCESS_1) {           \
+         ret = info.ABI.abi.flow_ret;                                          \
+         goto handle_label;                                            \
+       }                                                               \
+       goto label;                                                     \
+      }                                                                        \
+    }                                                                  \
   } G_STMT_END
 
-#define PROBE_PUSH(pad,mask,data,label)                                 \
-  PROBE_FULL(pad, mask, data, -1, -1, label);
-#define PROBE_PULL(pad,mask,data,offs,size,label)               \
-  PROBE_FULL(pad, mask, data, offs, size, label);
+#define PROBE_PUSH(pad,mask,data,label)                \
+  PROBE_FULL(pad, mask, data, -1, -1, label, FALSE, label);
+#define PROBE_HANDLE(pad,mask,data,label,handle_label) \
+  PROBE_FULL(pad, mask, data, -1, -1, label, TRUE, handle_label);
+#define PROBE_PULL(pad,mask,data,offs,size,label)              \
+  PROBE_FULL(pad, mask, data, offs, size, label, FALSE, label);
+
+static GstFlowReturn
+do_pad_idle_probe_wait (GstPad * pad)
+{
+  while (GST_PAD_IS_RUNNING_IDLE_PROBE (pad)) {
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+        "waiting idle probe to be removed");
+    GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_BLOCKING);
+    GST_PAD_BLOCK_WAIT (pad);
+    GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_BLOCKING);
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "We got unblocked");
+
+    if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
+      return GST_FLOW_FLUSHING;
+  }
+  return GST_FLOW_OK;
+}
+
+#define PROBE_TYPE_IS_SERIALIZED(i) \
+    ( \
+      ( \
+        (((i)->type & (GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | \
+        GST_PAD_PROBE_TYPE_EVENT_FLUSH)) && \
+        GST_EVENT_IS_SERIALIZED ((i)->data)) \
+      ) || ( \
+        (((i)->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) && \
+        GST_QUERY_IS_SERIALIZED ((i)->data)) \
+      ) || ( \
+        ((i)->type & (GST_PAD_PROBE_TYPE_BUFFER | \
+        GST_PAD_PROBE_TYPE_BUFFER_LIST))  \
+      ) \
+    )
 
 static GstFlowReturn
 do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
@@ -3249,30 +3696,44 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
   ProbeMarshall data;
   guint cookie;
   gboolean is_block;
+  gulong called_probes[N_STACK_ALLOCATE_PROBES];
 
   data.pad = pad;
   data.info = info;
   data.pass = FALSE;
+  data.handled = FALSE;
   data.marshalled = FALSE;
   data.dropped = FALSE;
-  data.cookie = ++pad->priv->probe_cookie;
+
+  /* We stack-allocate for N_STACK_ALLOCATE_PROBES hooks as a first step. If more are needed,
+   * we will re-allocate with g_malloc(). This should usually never be needed
+   */
+  data.called_probes = called_probes;
+  data.n_called_probes = 0;
+  data.called_probes_size = N_STACK_ALLOCATE_PROBES;
+  data.retry = FALSE;
 
   is_block =
       (info->type & GST_PAD_PROBE_TYPE_BLOCK) == GST_PAD_PROBE_TYPE_BLOCK;
 
+  if (is_block && PROBE_TYPE_IS_SERIALIZED (info)) {
+    if (do_pad_idle_probe_wait (pad) == GST_FLOW_FLUSHING)
+      goto flushing;
+  }
+
 again:
-  GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-      "do probes cookie %u", data.cookie);
+  GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "do probes");
   cookie = pad->priv->probe_list_cookie;
 
   g_hook_list_marshal (&pad->probes, TRUE,
       (GHookMarshaller) probe_hook_marshal, &data);
 
-  /* if the list changed, call the new callbacks (they will not have their
-   * cookie set to data.cookie */
+  /* if the list changed, call the new callbacks (they will not be in
+   * called_probes yet) */
   if (cookie != pad->priv->probe_list_cookie) {
     GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
         "probe list changed, restarting");
+    data.retry = TRUE;
     goto again;
   }
 
@@ -3280,6 +3741,11 @@ again:
   if (data.dropped)
     goto dropped;
 
+  /* If one handler took care of it, let the the item pass */
+  if (data.handled) {
+    goto handled;
+  }
+
   /* if no handler matched and we are blocking, let the item pass */
   if (!data.marshalled && is_block)
     goto passed;
@@ -3309,11 +3775,12 @@ again:
       GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_BLOCKING);
       GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "We got unblocked");
 
-      /* if the list changed, call the new callbacks (they will not have their
-       * cookie set to data.cookie */
+      /* if the list changed, call the new callbacks (they will not be in
+       * called_probes yet) */
       if (cookie != pad->priv->probe_list_cookie) {
         GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
             "probe list changed, restarting");
+        data.retry = TRUE;
         goto again;
       }
 
@@ -3322,25 +3789,41 @@ again:
     }
   }
 
+  if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+    g_free (data.called_probes);
+
   return defaultval;
 
   /* ERRORS */
 flushing:
   {
     GST_DEBUG_OBJECT (pad, "pad is flushing");
+    if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+      g_free (data.called_probes);
     return GST_FLOW_FLUSHING;
   }
 dropped:
   {
     GST_DEBUG_OBJECT (pad, "data is dropped");
+    if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+      g_free (data.called_probes);
     return GST_FLOW_CUSTOM_SUCCESS;
   }
 passed:
   {
     /* FIXME : Should we return FLOW_OK or the defaultval ?? */
     GST_DEBUG_OBJECT (pad, "data is passed");
+    if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+      g_free (data.called_probes);
     return GST_FLOW_OK;
   }
+handled:
+  {
+    GST_DEBUG_OBJECT (pad, "data was handled");
+    if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+      g_free (data.called_probes);
+    return GST_FLOW_CUSTOM_SUCCESS_1;
+  }
 }
 
 /* pad offsets */
@@ -3393,7 +3876,8 @@ gst_pad_set_offset (GstPad * pad, gint64 offset)
     goto done;
 
   pad->offset = offset;
-  GST_DEBUG_OBJECT (pad, "changed offset to %" G_GINT64_FORMAT, offset);
+  GST_DEBUG_OBJECT (pad, "changed offset to %" GST_STIME_FORMAT,
+      GST_STIME_ARGS (offset));
 
   /* resend all sticky events with updated offset on next buffer push */
   events_foreach (pad, mark_event_not_received, NULL);
@@ -3442,6 +3926,8 @@ push_sticky (GstPad * pad, PadEvent * ev, gpointer user_data)
   } else {
     data->ret = gst_pad_push_event_unchecked (pad, gst_event_ref (event),
         GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
+    if (data->ret == GST_FLOW_CUSTOM_SUCCESS_1)
+      data->ret = GST_FLOW_OK;
   }
 
   switch (data->ret) {
@@ -3466,11 +3952,12 @@ push_sticky (GstPad * pad, PadEvent * ev, gpointer user_data)
       break;
     case GST_FLOW_NOT_LINKED:
       /* not linked is not a problem, we are sticky so the event will be
-       * sent later but only for non-EOS events */
+       * rescheduled to be sent later on re-link, but only for non-EOS events */
       GST_DEBUG_OBJECT (pad, "pad was not linked, mark pending");
-      if (GST_EVENT_TYPE (event) != GST_EVENT_EOS)
+      if (GST_EVENT_TYPE (event) != GST_EVENT_EOS) {
         data->ret = GST_FLOW_OK;
-      GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS);
+        ev->received = TRUE;
+      }
       break;
     default:
       GST_DEBUG_OBJECT (pad, "result %s, mark pending events",
@@ -3565,6 +4052,7 @@ gst_pad_query (GstPad * pad, GstQuery * query)
 
   GST_DEBUG_OBJECT (pad, "doing query %p (%s)", query,
       GST_QUERY_TYPE_NAME (query));
+  GST_TRACER_PAD_QUERY_PRE (pad, query);
 
   serialized = GST_QUERY_IS_SERIALIZED (query);
   if (G_UNLIKELY (serialized))
@@ -3587,6 +4075,7 @@ gst_pad_query (GstPad * pad, GstQuery * query)
 
   GST_DEBUG_OBJECT (pad, "sent query %p (%s), result %d", query,
       GST_QUERY_TYPE_NAME (query), res);
+  GST_TRACER_PAD_QUERY_POST (pad, query, res);
 
   if (res != TRUE)
     goto query_failed;
@@ -3642,12 +4131,12 @@ probe_stopped:
     if (G_UNLIKELY (serialized))
       GST_PAD_STREAM_UNLOCK (pad);
 
-    /* if a probe dropped, we don't sent it further but assume that the probe
-     * answered the query and return TRUE */
-    if (ret == GST_FLOW_CUSTOM_SUCCESS)
-      res = TRUE;
-    else
+    /* if a probe dropped without handling, we don't sent it further but assume
+     * that the probe did not answer the query and return FALSE */
+    if (ret != GST_FLOW_CUSTOM_SUCCESS_1)
       res = FALSE;
+    else
+      res = TRUE;
 
     return res;
   }
@@ -3759,12 +4248,12 @@ probe_stopped:
     GST_DEBUG_OBJECT (pad, "probe stopped: %s", gst_flow_get_name (ret));
     GST_OBJECT_UNLOCK (pad);
 
-    /* if a probe dropped, we don't sent it further but assume that the probe
-     * answered the query and return TRUE */
-    if (ret == GST_FLOW_CUSTOM_SUCCESS)
-      res = TRUE;
-    else
+    /* if a probe dropped without handling, we don't sent it further but
+     * assume that the probe did not answer the query and return FALSE */
+    if (ret != GST_FLOW_CUSTOM_SUCCESS_1)
       res = FALSE;
+    else
+      res = TRUE;
 
     return res;
   }
@@ -3782,6 +4271,7 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data)
 {
   GstFlowReturn ret;
   GstObject *parent;
+  gboolean handled = FALSE;
 
   GST_PAD_STREAM_LOCK (pad);
 
@@ -3795,7 +4285,7 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data)
   if (G_UNLIKELY (GST_PAD_MODE (pad) != GST_PAD_MODE_PUSH))
     goto wrong_mode;
 
-#ifndef G_DISABLE_ASSERT
+#ifdef GST_ENABLE_EXTRA_CHECKS
   if (G_UNLIKELY (pad->priv->last_cookie != pad->priv->events_cookie)) {
     if (!find_event_by_type (pad, GST_EVENT_STREAM_START, 0)) {
       g_warning (G_STRLOC
@@ -3811,11 +4301,12 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data)
   }
 #endif
 
-  PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped);
+  PROBE_HANDLE (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped,
+      probe_handled);
 
-  PROBE_PUSH (pad, type, data, probe_stopped);
+  PROBE_HANDLE (pad, type, data, probe_stopped, probe_handled);
 
-  parent = GST_OBJECT_PARENT (pad);
+  ACQUIRE_PARENT (pad, parent, no_parent);
   GST_OBJECT_UNLOCK (pad);
 
   /* NOTE: we read the chainfunc unlocked.
@@ -3829,13 +4320,13 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data)
     if (G_UNLIKELY ((chainfunc = GST_PAD_CHAINFUNC (pad)) == NULL))
       goto no_function;
 
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+    GST_CAT_DEBUG_OBJECT (GST_CAT_SCHEDULING, pad,
         "calling chainfunction &%s with buffer %" GST_PTR_FORMAT,
         GST_DEBUG_FUNCPTR_NAME (chainfunc), GST_BUFFER (data));
 
     ret = chainfunc (pad, parent, GST_BUFFER_CAST (data));
 
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+    GST_CAT_DEBUG_OBJECT (GST_CAT_SCHEDULING, pad,
         "called chainfunction &%s with buffer %p, returned %s",
         GST_DEBUG_FUNCPTR_NAME (chainfunc), data, gst_flow_get_name (ret));
   } else {
@@ -3844,17 +4335,19 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data)
     if (G_UNLIKELY ((chainlistfunc = GST_PAD_CHAINLISTFUNC (pad)) == NULL))
       goto no_function;
 
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+    GST_CAT_DEBUG_OBJECT (GST_CAT_SCHEDULING, pad,
         "calling chainlistfunction &%s",
         GST_DEBUG_FUNCPTR_NAME (chainlistfunc));
 
     ret = chainlistfunc (pad, parent, GST_BUFFER_LIST_CAST (data));
 
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+    GST_CAT_DEBUG_OBJECT (GST_CAT_SCHEDULING, pad,
         "called chainlistfunction &%s, returned %s",
         GST_DEBUG_FUNCPTR_NAME (chainlistfunc), gst_flow_get_name (ret));
   }
 
+  RELEASE_PARENT (parent);
+
   GST_PAD_STREAM_UNLOCK (pad);
 
   return ret;
@@ -3886,15 +4379,21 @@ wrong_mode:
     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
     return GST_FLOW_ERROR;
   }
+probe_handled:
+  handled = TRUE;
+  /* PASSTHROUGH */
 probe_stopped:
   {
     GST_OBJECT_UNLOCK (pad);
     GST_PAD_STREAM_UNLOCK (pad);
-    gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+    /* We unref the buffer, except if the probe handled it (CUSTOM_SUCCESS_1) */
+    if (!handled)
+      gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
 
     switch (ret) {
       case GST_FLOW_CUSTOM_SUCCESS:
-        GST_DEBUG_OBJECT (pad, "dropped buffer");
+      case GST_FLOW_CUSTOM_SUCCESS_1:
+        GST_DEBUG_OBJECT (pad, "dropped or handled buffer");
         ret = GST_FLOW_OK;
         break;
       default:
@@ -3903,8 +4402,17 @@ probe_stopped:
     }
     return ret;
   }
+no_parent:
+  {
+    GST_DEBUG_OBJECT (pad, "No parent when chaining %" GST_PTR_FORMAT, data);
+    gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+    GST_OBJECT_UNLOCK (pad);
+    GST_PAD_STREAM_UNLOCK (pad);
+    return GST_FLOW_FLUSHING;
+  }
 no_function:
   {
+    RELEASE_PARENT (parent);
     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
     g_critical ("chain on pad %s:%s but it has no chainfunction",
         GST_DEBUG_PAD_NAME (pad));
@@ -3958,7 +4466,7 @@ gst_pad_chain_list_default (GstPad * pad, GstObject * parent,
   GstBuffer *buffer;
   GstFlowReturn ret;
 
-  GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
+  GST_INFO_OBJECT (pad, "chaining each buffer in list individually");
 
   len = gst_buffer_list_length (list);
 
@@ -4018,6 +4526,7 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data)
 {
   GstPad *peer;
   GstFlowReturn ret;
+  gboolean handled = FALSE;
 
   GST_OBJECT_LOCK (pad);
   if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
@@ -4029,7 +4538,7 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data)
   if (G_UNLIKELY (GST_PAD_MODE (pad) != GST_PAD_MODE_PUSH))
     goto wrong_mode;
 
-#ifndef G_DISABLE_ASSERT
+#ifdef GST_ENABLE_EXTRA_CHECKS
   if (G_UNLIKELY (pad->priv->last_cookie != pad->priv->events_cookie)) {
     if (!find_event_by_type (pad, GST_EVENT_STREAM_START, 0)) {
       g_warning (G_STRLOC
@@ -4049,14 +4558,19 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data)
     goto events_error;
 
   /* do block probes */
-  PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped);
+  PROBE_HANDLE (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped,
+      probe_handled);
 
   /* recheck sticky events because the probe might have cause a relink */
   if (G_UNLIKELY ((ret = check_sticky (pad, NULL))) != GST_FLOW_OK)
     goto events_error;
 
   /* do post-blocking probes */
-  PROBE_PUSH (pad, type, data, probe_stopped);
+  PROBE_HANDLE (pad, type, data, probe_stopped, probe_handled);
+
+  /* recheck sticky events because the probe might have cause a relink */
+  if (G_UNLIKELY ((ret = check_sticky (pad, NULL))) != GST_FLOW_OK)
+    goto events_error;
 
   if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
     goto not_linked;
@@ -4067,6 +4581,7 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data)
   GST_OBJECT_UNLOCK (pad);
 
   ret = gst_pad_chain_data_unchecked (peer, type, data);
+  data = NULL;
 
   gst_object_unref (peer);
 
@@ -4119,22 +4634,26 @@ events_error:
     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
     return ret;
   }
+probe_handled:
+  handled = TRUE;
+  /* PASSTHROUGH */
 probe_stopped:
   {
     GST_OBJECT_UNLOCK (pad);
-    pad->ABI.abi.last_flowret =
-        ret == GST_FLOW_CUSTOM_SUCCESS ? GST_FLOW_OK : ret;
-    gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+    if (data != NULL && !handled)
+      gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
 
     switch (ret) {
       case GST_FLOW_CUSTOM_SUCCESS:
-        GST_DEBUG_OBJECT (pad, "dropped buffer");
+      case GST_FLOW_CUSTOM_SUCCESS_1:
+        GST_DEBUG_OBJECT (pad, "dropped or handled buffer");
         ret = GST_FLOW_OK;
         break;
       default:
         GST_DEBUG_OBJECT (pad, "an error occurred %s", gst_flow_get_name (ret));
         break;
     }
+    pad->ABI.abi.last_flowret = ret;
     return ret;
   }
 not_linked:
@@ -4173,12 +4692,17 @@ not_linked:
 GstFlowReturn
 gst_pad_push (GstPad * pad, GstBuffer * buffer)
 {
+  GstFlowReturn res;
+
   g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_PAD_IS_SRC (pad), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
 
-  return gst_pad_push_data (pad,
+  GST_TRACER_PAD_PUSH_PRE (pad, buffer);
+  res = gst_pad_push_data (pad,
       GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH, buffer);
+  GST_TRACER_PAD_PUSH_POST (pad, res);
+  return res;
 }
 
 /**
@@ -4208,12 +4732,17 @@ gst_pad_push (GstPad * pad, GstBuffer * buffer)
 GstFlowReturn
 gst_pad_push_list (GstPad * pad, GstBufferList * list)
 {
+  GstFlowReturn res;
+
   g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_PAD_IS_SRC (pad), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_IS_BUFFER_LIST (list), GST_FLOW_ERROR);
 
-  return gst_pad_push_data (pad,
+  GST_TRACER_PAD_PUSH_LIST_PRE (pad, list);
+  res = gst_pad_push_data (pad,
       GST_PAD_PROBE_TYPE_BUFFER_LIST | GST_PAD_PROBE_TYPE_PUSH, list);
+  GST_TRACER_PAD_PUSH_LIST_POST (pad, res);
+  return res;
 }
 
 static GstFlowReturn
@@ -4477,6 +5006,8 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
   g_return_val_if_fail (*buffer == NULL || (GST_IS_BUFFER (*buffer)
           && gst_buffer_get_size (*buffer) >= size), GST_FLOW_ERROR);
 
+  GST_TRACER_PAD_PULL_RANGE_PRE (pad, offset, size);
+
   GST_OBJECT_LOCK (pad);
   if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
     goto flushing;
@@ -4523,6 +5054,7 @@ probed_data:
 
   *buffer = res_buf;
 
+  GST_TRACER_PAD_PULL_RANGE_POST (pad, *buffer, ret);
   return ret;
 
   /* ERROR recovery here */
@@ -4532,7 +5064,8 @@ flushing:
         "pullrange, but pad was flushing");
     pad->ABI.abi.last_flowret = GST_FLOW_FLUSHING;
     GST_OBJECT_UNLOCK (pad);
-    return GST_FLOW_FLUSHING;
+    ret = GST_FLOW_FLUSHING;
+    goto done;
   }
 wrong_mode:
   {
@@ -4540,7 +5073,8 @@ wrong_mode:
         GST_DEBUG_PAD_NAME (pad));
     pad->ABI.abi.last_flowret = GST_FLOW_ERROR;
     GST_OBJECT_UNLOCK (pad);
-    return GST_FLOW_ERROR;
+    ret = GST_FLOW_ERROR;
+    goto done;
   }
 probe_stopped:
   {
@@ -4561,7 +5095,7 @@ probe_stopped:
     }
     pad->ABI.abi.last_flowret = ret;
     GST_OBJECT_UNLOCK (pad);
-    return ret;
+    goto done;
   }
 not_linked:
   {
@@ -4569,7 +5103,8 @@ not_linked:
         "pulling range, but it was not linked");
     pad->ABI.abi.last_flowret = GST_FLOW_NOT_LINKED;
     GST_OBJECT_UNLOCK (pad);
-    return GST_FLOW_NOT_LINKED;
+    ret = GST_FLOW_NOT_LINKED;
+    goto done;
   }
 pull_range_failed:
   {
@@ -4578,7 +5113,7 @@ pull_range_failed:
     GST_CAT_LEVEL_LOG (GST_CAT_SCHEDULING,
         (ret >= GST_FLOW_EOS) ? GST_LEVEL_INFO : GST_LEVEL_WARNING,
         pad, "pullrange failed, flow: %s", gst_flow_get_name (ret));
-    return ret;
+    goto done;
   }
 probe_stopped_unref:
   {
@@ -4594,8 +5129,11 @@ probe_stopped_unref:
 
     if (*buffer == NULL)
       gst_buffer_unref (res_buf);
-    return ret;
+    goto done;
   }
+done:
+  GST_TRACER_PAD_PULL_RANGE_POST (pad, NULL, ret);
+  return ret;
 }
 
 /* must be called with pad object lock */
@@ -4620,6 +5158,15 @@ store_sticky_event (GstPad * pad, GstEvent * event)
                   || type == GST_EVENT_EOS))))
     goto flushed;
 
+  /* Unset the EOS flag when received STREAM_START event, so pad can
+   * store sticky event and then push it later */
+  if (type == GST_EVENT_STREAM_START) {
+    GST_LOG_OBJECT (pad, "Removing pending EOS and StreamGroupDone events");
+    remove_event_by_type (pad, GST_EVENT_EOS);
+    remove_event_by_type (pad, GST_EVENT_STREAM_GROUP_DONE);
+    GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_EOS);
+  }
+
   if (G_UNLIKELY (GST_PAD_IS_EOS (pad)))
     goto eos;
 
@@ -4712,7 +5259,7 @@ eos:
 /**
  * gst_pad_store_sticky_event:
  * @pad: a #GstPad
- * @event: a #GstEvent
+ * @event: (transfer none): a #GstEvent
  *
  * Store the sticky @event on @pad
  *
@@ -4757,6 +5304,7 @@ gst_pad_push_event_unchecked (GstPad * pad, GstEvent * event,
   GstFlowReturn ret;
   GstPad *peerpad;
   GstEventType event_type;
+  gint64 old_pad_offset = pad->offset;
 
   /* pass the adjusted event on. We need to do this even if
    * there is no peer pad because of the probes. */
@@ -4782,6 +5330,7 @@ gst_pad_push_event_unchecked (GstPad * pad, GstEvent * event,
       /* Remove sticky EOS events */
       GST_LOG_OBJECT (pad, "Removing pending EOS events");
       remove_event_by_type (pad, GST_EVENT_EOS);
+      remove_event_by_type (pad, GST_EVENT_STREAM_GROUP_DONE);
       remove_event_by_type (pad, GST_EVENT_SEGMENT);
       GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_EOS);
       pad->ABI.abi.last_flowret = GST_FLOW_OK;
@@ -4808,6 +5357,17 @@ gst_pad_push_event_unchecked (GstPad * pad, GstEvent * event,
       }
       PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_PUSH |
           GST_PAD_PROBE_TYPE_BLOCK, event, probe_stopped);
+      /* recheck sticky events because the probe might have cause a relink */
+      if (GST_PAD_HAS_PENDING_EVENTS (pad) && GST_PAD_IS_SRC (pad)
+          && (GST_EVENT_IS_SERIALIZED (event)
+              || GST_EVENT_IS_STICKY (event))) {
+        PushStickyData data = { GST_FLOW_OK, FALSE, event };
+        GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_PENDING_EVENTS);
+
+        /* Push all sticky events before our current one
+         * that have changed */
+        events_foreach (pad, sticky_changed, &data);
+      }
       break;
     }
   }
@@ -4827,6 +5387,15 @@ gst_pad_push_event_unchecked (GstPad * pad, GstEvent * event,
     events_foreach (pad, sticky_changed, &data);
   }
 
+  /* the pad offset might've been changed by any of the probes above. It
+   * would've been taken into account when repushing any of the sticky events
+   * above but not for our current event here */
+  if (G_UNLIKELY (old_pad_offset != pad->offset)) {
+    event =
+        _apply_pad_offset (pad, event, GST_PAD_IS_SINK (pad),
+        pad->offset - old_pad_offset);
+  }
+
   /* now check the peer pad */
   peerpad = GST_PAD_PEER (pad);
   if (peerpad == NULL)
@@ -4844,7 +5413,7 @@ gst_pad_push_event_unchecked (GstPad * pad, GstEvent * event,
   /* Note: we gave away ownership of the event at this point but we can still
    * print the old pointer */
   GST_LOG_OBJECT (pad,
-      "sent event %p to (%s) peerpad %" GST_PTR_FORMAT ", ret %s", event,
+      "sent event %p (%s) to peerpad %" GST_PTR_FORMAT ", ret %s", event,
       gst_event_type_get_name (event_type), peerpad, gst_flow_get_name (ret));
 
   gst_object_unref (peerpad);
@@ -4874,9 +5443,13 @@ inactive:
 probe_stopped:
   {
     GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS);
-    gst_event_unref (event);
+    if (ret != GST_FLOW_CUSTOM_SUCCESS_1)
+      gst_event_unref (event);
 
     switch (ret) {
+      case GST_FLOW_CUSTOM_SUCCESS_1:
+        GST_DEBUG_OBJECT (pad, "handled event");
+        break;
       case GST_FLOW_CUSTOM_SUCCESS:
         GST_DEBUG_OBJECT (pad, "dropped event");
         break;
@@ -4932,6 +5505,8 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
   g_return_val_if_fail (GST_IS_EVENT (event), FALSE);
 
+  GST_TRACER_PAD_PUSH_EVENT_PRE (pad, event);
+
   if (GST_PAD_IS_SRC (pad)) {
     if (G_UNLIKELY (!GST_EVENT_IS_DOWNSTREAM (event)))
       goto wrong_direction;
@@ -4972,7 +5547,8 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
     /* other events are pushed right away */
     ret = gst_pad_push_event_unchecked (pad, event, type);
     /* dropped events by a probe are not an error */
-    res = (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_SUCCESS);
+    res = (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_SUCCESS
+        || ret == GST_FLOW_CUSTOM_SUCCESS_1);
   } else {
     /* Errors in sticky event pushing are no problem and ignored here
      * as they will cause more meaningful errors during data flow.
@@ -4985,6 +5561,7 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
   }
   GST_OBJECT_UNLOCK (pad);
 
+  GST_TRACER_PAD_PUSH_EVENT_POST (pad, res);
   return res;
 
   /* ERROR handling */
@@ -4993,28 +5570,31 @@ wrong_direction:
     g_warning ("pad %s:%s pushing %s event in wrong direction",
         GST_DEBUG_PAD_NAME (pad), GST_EVENT_TYPE_NAME (event));
     gst_event_unref (event);
-    return FALSE;
+    goto done;
   }
 unknown_direction:
   {
     g_warning ("pad %s:%s has invalid direction", GST_DEBUG_PAD_NAME (pad));
     gst_event_unref (event);
-    return FALSE;
+    goto done;
   }
 flushed:
   {
     GST_DEBUG_OBJECT (pad, "We're flushing");
     GST_OBJECT_UNLOCK (pad);
     gst_event_unref (event);
-    return FALSE;
+    goto done;
   }
 eos:
   {
     GST_DEBUG_OBJECT (pad, "We're EOS");
     GST_OBJECT_UNLOCK (pad);
     gst_event_unref (event);
-    return FALSE;
+    goto done;
   }
+done:
+  GST_TRACER_PAD_PUSH_EVENT_POST (pad, FALSE);
+  return FALSE;
 }
 
 /* Check if we can call the event function with the given event */
@@ -5055,10 +5635,13 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event,
   GstEventType event_type;
   gboolean serialized, need_unlock = FALSE, sticky;
   GstPadEventFunction eventfunc;
+  GstPadEventFullFunction eventfullfunc = NULL;
   GstObject *parent;
+  gint64 old_pad_offset;
 
   GST_OBJECT_LOCK (pad);
 
+  old_pad_offset = pad->offset;
   event = apply_pad_offset (pad, event, GST_PAD_IS_SRC (pad));
 
   if (GST_PAD_IS_SINK (pad))
@@ -5078,6 +5661,8 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event,
 
       GST_PAD_SET_FLUSHING (pad);
       GST_CAT_DEBUG_OBJECT (GST_CAT_EVENT, pad, "set flush flag");
+      GST_PAD_BLOCK_BROADCAST (pad);
+      type |= GST_PAD_PROBE_TYPE_EVENT_FLUSH;
       break;
     case GST_EVENT_FLUSH_STOP:
       /* we can't accept flush-stop on inactive pads else the flushing flag
@@ -5092,6 +5677,7 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event,
       /* Remove pending EOS events */
       GST_LOG_OBJECT (pad, "Removing pending EOS and SEGMENT events");
       remove_event_by_type (pad, GST_EVENT_EOS);
+      remove_event_by_type (pad, GST_EVENT_STREAM_GROUP_DONE);
       remove_event_by_type (pad, GST_EVENT_SEGMENT);
       GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_EOS);
       pad->ABI.abi.last_flowret = GST_FLOW_OK;
@@ -5114,6 +5700,18 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event,
       if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
         goto flushing;
 
+      switch (event_type) {
+        case GST_EVENT_STREAM_START:
+          /* Remove sticky EOS events */
+          GST_LOG_OBJECT (pad, "Removing pending EOS events");
+          remove_event_by_type (pad, GST_EVENT_EOS);
+          remove_event_by_type (pad, GST_EVENT_STREAM_GROUP_DONE);
+          GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_EOS);
+          break;
+        default:
+          break;
+      }
+
       if (serialized) {
         if (G_UNLIKELY (GST_PAD_IS_EOS (pad)))
           goto eos;
@@ -5139,7 +5737,18 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event,
 
   PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_PUSH, event, probe_stopped);
 
-  if (G_UNLIKELY ((eventfunc = GST_PAD_EVENTFUNC (pad)) == NULL))
+  /* the pad offset might've been changed by any of the probes above. It
+   * would've been taken into account when repushing any of the sticky events
+   * above but not for our current event here */
+  if (G_UNLIKELY (old_pad_offset != pad->offset)) {
+    event =
+        _apply_pad_offset (pad, event, GST_PAD_IS_SRC (pad),
+        pad->offset - old_pad_offset);
+  }
+
+  eventfullfunc = GST_PAD_EVENTFULLFUNC (pad);
+  eventfunc = GST_PAD_EVENTFUNC (pad);
+  if (G_UNLIKELY (eventfunc == NULL && eventfullfunc == NULL))
     goto no_function;
 
   ACQUIRE_PARENT (pad, parent, no_parent);
@@ -5152,7 +5761,9 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event,
   if (sticky)
     gst_event_ref (event);
 
-  if (eventfunc (pad, parent, event)) {
+  if (eventfullfunc) {
+    ret = eventfullfunc (pad, parent, event);
+  } else if (eventfunc (pad, parent, event)) {
     ret = GST_FLOW_OK;
   } else {
     /* something went wrong */
@@ -5228,11 +5839,14 @@ probe_stopped:
     GST_OBJECT_UNLOCK (pad);
     if (need_unlock)
       GST_PAD_STREAM_UNLOCK (pad);
-    gst_event_unref (event);
+    /* Only unref if unhandled */
+    if (ret != GST_FLOW_CUSTOM_SUCCESS_1)
+      gst_event_unref (event);
 
     switch (ret) {
+      case GST_FLOW_CUSTOM_SUCCESS_1:
       case GST_FLOW_CUSTOM_SUCCESS:
-        GST_DEBUG_OBJECT (pad, "dropped event");
+        GST_DEBUG_OBJECT (pad, "dropped or handled event");
         ret = GST_FLOW_OK;
         break;
       default:
@@ -5364,7 +5978,7 @@ gst_pad_set_element_private (GstPad * pad, gpointer priv)
  * Gets the private data of a pad.
  * No locking is performed in this function.
  *
- * Returns: (transfer none): a #gpointer to the private data.
+ * Returns: (transfer none) (nullable): a #gpointer to the private data.
  */
 gpointer
 gst_pad_get_element_private (GstPad * pad)
@@ -5596,6 +6210,9 @@ gst_pad_pause_task (GstPad * pad)
   if (task == NULL)
     goto no_task;
   res = gst_task_set_state (task, GST_TASK_PAUSED);
+  /* unblock activation waits if any */
+  pad->priv->in_activation = FALSE;
+  g_cond_broadcast (&pad->priv->activation_cond);
   GST_OBJECT_UNLOCK (pad);
 
   /* wait for task function to finish, this lock is recursive so it does nothing
@@ -5614,6 +6231,42 @@ no_task:
 }
 
 /**
+ * gst_pad_get_task_state:
+ * @pad: the #GstPad to get task state from
+ *
+ * Get @pad task state. If no task is currently
+ * set, #GST_TASK_STOPPED is returned.
+ *
+ * Returns: The current state of @pad's task.
+ *
+ * Since: 1.12
+ */
+GstTaskState
+gst_pad_get_task_state (GstPad * pad)
+{
+  GstTask *task;
+  GstTaskState res;
+
+  g_return_val_if_fail (GST_IS_PAD (pad), GST_TASK_STOPPED);
+
+  GST_OBJECT_LOCK (pad);
+  task = GST_PAD_TASK (pad);
+  if (task == NULL)
+    goto no_task;
+  res = gst_task_get_state (task);
+  GST_OBJECT_UNLOCK (pad);
+
+  return res;
+
+no_task:
+  {
+    GST_DEBUG_OBJECT (pad, "pad has no task");
+    GST_OBJECT_UNLOCK (pad);
+    return GST_TASK_STOPPED;
+  }
+}
+
+/**
  * gst_pad_stop_task:
  * @pad: the #GstPad to stop the task of
  *
@@ -5645,6 +6298,9 @@ gst_pad_stop_task (GstPad * pad)
     goto no_task;
   GST_PAD_TASK (pad) = NULL;
   res = gst_task_set_state (task, GST_TASK_STOPPED);
+  /* unblock activation waits if any */
+  pad->priv->in_activation = FALSE;
+  g_cond_broadcast (&pad->priv->activation_cond);
   GST_OBJECT_UNLOCK (pad);
 
   GST_PAD_STREAM_LOCK (pad);
@@ -5688,7 +6344,7 @@ join_failed:
  * gst_pad_probe_info_get_event:
  * @info: a #GstPadProbeInfo
  *
- * Returns: (transfer none): The #GstEvent from the probe
+ * Returns: (transfer none) (nullable): The #GstEvent from the probe
  */
 
 GstEvent *
@@ -5705,7 +6361,7 @@ gst_pad_probe_info_get_event (GstPadProbeInfo * info)
  * gst_pad_probe_info_get_query:
  * @info: a #GstPadProbeInfo
  *
- * Returns: (transfer none): The #GstQuery from the probe
+ * Returns: (transfer none) (nullable): The #GstQuery from the probe
  */
 
 GstQuery *
@@ -5721,7 +6377,7 @@ gst_pad_probe_info_get_query (GstPadProbeInfo * info)
  * gst_pad_probe_info_get_buffer:
  * @info: a #GstPadProbeInfo
  *
- * Returns: (transfer none): The #GstBuffer from the probe
+ * Returns: (transfer none) (nullable): The #GstBuffer from the probe
  */
 
 GstBuffer *
@@ -5733,10 +6389,10 @@ gst_pad_probe_info_get_buffer (GstPadProbeInfo * info)
 }
 
 /**
- * gst_pad_probe_info_get_bufferlist:
+ * gst_pad_probe_info_get_buffer_list:
  * @info: a #GstPadProbeInfo
  *
- * Returns: (transfer none): The #GstBufferlist from the probe
+ * Returns: (transfer none) (nullable): The #GstBufferList from the probe
  */
 
 GstBufferList *