element: Enforce that elements created by gst_element_factory_create/make() are floating
[platform/upstream/gstreamer.git] / gst / gstpad.c
index fa28b41..dcd52f2 100644 (file)
@@ -121,9 +121,6 @@ enum
       /* FILL ME */
 };
 
-#define GST_PAD_GET_PRIVATE(obj)  \
-   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_PAD, GstPadPrivate))
-
 #define _PAD_PROBE_TYPE_ALL_BOTH_AND_FLUSH (GST_PAD_PROBE_TYPE_ALL_BOTH | GST_PAD_PROBE_TYPE_EVENT_FLUSH)
 
 /* we have a pending and an active event on the pad. On source pads only the
@@ -143,21 +140,23 @@ struct _GstPadPrivate
 
   gint using;
   guint probe_list_cookie;
-  guint probe_cookie;
 
   /* counter of how many idle probes are running directly from the add_probe
    * call. Used to block any data flowing in the pad while the idle callback
    * Doesn't finish its work */
   gint idle_running;
+
+  /* conditional and variable used to ensure pads only get (de)activated
+   * by a single thread at a time. Protected by the object lock */
+  GCond activation_cond;
+  gboolean in_activation;
 };
 
 typedef struct
 {
   GHook hook;
-  guint cookie;
 } GstProbe;
 
-#define PROBE_COOKIE(h) (((GstProbe *)(h))->cookie)
 #define GST_PAD_IS_RUNNING_IDLE_PROBE(p) \
     (((GstPad *)(p))->priv->idle_running > 0)
 
@@ -169,7 +168,11 @@ typedef struct
   gboolean pass;
   gboolean handled;
   gboolean marshalled;
-  guint cookie;
+
+  gulong *called_probes;
+  guint n_called_probes;
+  guint called_probes_size;
+  gboolean retry;
 } ProbeMarshall;
 
 static void gst_pad_dispose (GObject * object);
@@ -314,7 +317,8 @@ gst_pad_link_get_name (GstPadLinkReturn ret)
 }
 
 #define gst_pad_parent_class parent_class
-G_DEFINE_TYPE_WITH_CODE (GstPad, gst_pad, GST_TYPE_OBJECT, _do_init);
+G_DEFINE_TYPE_WITH_CODE (GstPad, gst_pad, GST_TYPE_OBJECT,
+    G_ADD_PRIVATE (GstPad) _do_init);
 
 static void
 gst_pad_class_init (GstPadClass * klass)
@@ -325,8 +329,6 @@ gst_pad_class_init (GstPadClass * klass)
   gobject_class = G_OBJECT_CLASS (klass);
   gstobject_class = GST_OBJECT_CLASS (klass);
 
-  g_type_class_add_private (klass, sizeof (GstPadPrivate));
-
   gobject_class->dispose = gst_pad_dispose;
   gobject_class->finalize = gst_pad_finalize;
   gobject_class->set_property = gst_pad_set_property;
@@ -396,7 +398,7 @@ gst_pad_class_init (GstPadClass * klass)
 static void
 gst_pad_init (GstPad * pad)
 {
-  pad->priv = GST_PAD_GET_PRIVATE (pad);
+  pad->priv = gst_pad_get_instance_private (pad);
 
   GST_PAD_DIRECTION (pad) = GST_PAD_UNKNOWN;
 
@@ -417,6 +419,8 @@ gst_pad_init (GstPad * pad)
   pad->priv->events = g_array_sized_new (FALSE, TRUE, sizeof (PadEvent), 16);
   pad->priv->events_cookie = 0;
   pad->priv->last_cookie = -1;
+  g_cond_init (&pad->priv->activation_cond);
+
   pad->ABI.abi.last_flowret = GST_FLOW_FLUSHING;
 }
 
@@ -597,7 +601,7 @@ restart:
     if (G_UNLIKELY (ev->event == NULL))
       goto next;
 
-    /* take aditional ref, func might release the lock */
+    /* take additional ref, func might release the lock */
     ev_ret.event = gst_event_ref (ev->event);
     ev_ret.received = ev->received;
 
@@ -640,12 +644,13 @@ restart:
 
 /* should be called with LOCK */
 static GstEvent *
-_apply_pad_offset (GstPad * pad, GstEvent * event, gboolean upstream)
+_apply_pad_offset (GstPad * pad, GstEvent * event, gboolean upstream,
+    gint64 pad_offset)
 {
   gint64 offset;
 
   GST_DEBUG_OBJECT (pad, "apply pad offset %" GST_STIME_FORMAT,
-      GST_STIME_ARGS (pad->offset));
+      GST_STIME_ARGS (pad_offset));
 
   if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
     GstSegment segment;
@@ -656,16 +661,16 @@ _apply_pad_offset (GstPad * pad, GstEvent * event, gboolean upstream)
     gst_event_copy_segment (event, &segment);
     gst_event_unref (event);
 
-    gst_segment_offset_running_time (&segment, segment.format, pad->offset);
+    gst_segment_offset_running_time (&segment, segment.format, pad_offset);
     event = gst_event_new_segment (&segment);
   }
 
   event = gst_event_make_writable (event);
   offset = gst_event_get_running_time_offset (event);
   if (upstream)
-    offset -= pad->offset;
+    offset -= pad_offset;
   else
-    offset += pad->offset;
+    offset += pad_offset;
   gst_event_set_running_time_offset (event, offset);
 
   return event;
@@ -675,11 +680,10 @@ static inline GstEvent *
 apply_pad_offset (GstPad * pad, GstEvent * event, gboolean upstream)
 {
   if (G_UNLIKELY (pad->offset != 0))
-    return _apply_pad_offset (pad, event, upstream);
+    return _apply_pad_offset (pad, event, upstream, pad->offset);
   return event;
 }
 
-
 /* should be called with the OBJECT_LOCK */
 static GstCaps *
 get_pad_caps (GstPad * pad)
@@ -762,6 +766,7 @@ gst_pad_finalize (GObject * object)
 
   g_rec_mutex_clear (&pad->stream_rec_lock);
   g_cond_clear (&pad->block_cond);
+  g_cond_clear (&pad->priv->activation_cond);
   g_array_free (pad->priv->events, TRUE);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
@@ -827,8 +832,7 @@ gst_pad_get_property (GObject * object, guint prop_id,
  * will be assigned.
  * This function makes a copy of the name so you can safely free the name.
  *
- * Returns: (transfer floating) (nullable): a new #GstPad, or %NULL in
- * case of an error.
+ * Returns: (transfer floating): a new #GstPad.
  *
  * MT safe.
  */
@@ -849,15 +853,18 @@ gst_pad_new (const gchar * name, GstPadDirection direction)
  * will be assigned.
  * This function makes a copy of the name so you can safely free the name.
  *
- * Returns: (transfer floating) (nullable): a new #GstPad, or %NULL in
- * case of an error.
+ * Returns: (transfer floating): a new #GstPad.
  */
 GstPad *
 gst_pad_new_from_template (GstPadTemplate * templ, const gchar * name)
 {
+  GType pad_type =
+      GST_PAD_TEMPLATE_GTYPE (templ) ==
+      G_TYPE_NONE ? GST_TYPE_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
+
   g_return_val_if_fail (GST_IS_PAD_TEMPLATE (templ), NULL);
 
-  return g_object_new (GST_TYPE_PAD,
+  return g_object_new (pad_type,
       "name", name, "direction", templ->direction, "template", templ, NULL);
 }
 
@@ -871,8 +878,7 @@ gst_pad_new_from_template (GstPadTemplate * templ, const gchar * name)
  * will be assigned.
  * This function makes a copy of the name so you can safely free the name.
  *
- * Returns: (transfer floating) (nullable): a new #GstPad, or %NULL in
- * case of an error.
+ * Returns: (transfer floating): a new #GstPad.
  */
 GstPad *
 gst_pad_new_from_static_template (GstStaticPadTemplate * templ,
@@ -966,12 +972,15 @@ pre_activate (GstPad * pad, GstPadMode new_mode)
   switch (new_mode) {
     case GST_PAD_MODE_NONE:
       GST_OBJECT_LOCK (pad);
+      while (G_UNLIKELY (pad->priv->in_activation))
+        g_cond_wait (&pad->priv->activation_cond, GST_OBJECT_GET_LOCK (pad));
       if (new_mode == GST_PAD_MODE (pad)) {
         GST_WARNING_OBJECT (pad,
             "Pad is already in the process of being deactivated");
         GST_OBJECT_UNLOCK (pad);
         return FALSE;
       }
+      pad->priv->in_activation = TRUE;
       GST_DEBUG_OBJECT (pad, "setting PAD_MODE NONE, set flushing");
       GST_PAD_SET_FLUSHING (pad);
       pad->ABI.abi.last_flowret = GST_FLOW_FLUSHING;
@@ -983,12 +992,15 @@ pre_activate (GstPad * pad, GstPadMode new_mode)
     case GST_PAD_MODE_PUSH:
     case GST_PAD_MODE_PULL:
       GST_OBJECT_LOCK (pad);
+      while (G_UNLIKELY (pad->priv->in_activation))
+        g_cond_wait (&pad->priv->activation_cond, GST_OBJECT_GET_LOCK (pad));
       if (new_mode == GST_PAD_MODE (pad)) {
         GST_WARNING_OBJECT (pad,
             "Pad is already in the process of being activated");
         GST_OBJECT_UNLOCK (pad);
         return FALSE;
       }
+      pad->priv->in_activation = TRUE;
       GST_DEBUG_OBJECT (pad, "setting pad into %s mode, unset flushing",
           gst_pad_mode_get_name (new_mode));
       GST_PAD_UNSET_FLUSHING (pad);
@@ -1025,6 +1037,11 @@ post_activate (GstPad * pad, GstPadMode new_mode)
 {
   switch (new_mode) {
     case GST_PAD_MODE_NONE:
+      GST_OBJECT_LOCK (pad);
+      pad->priv->in_activation = FALSE;
+      g_cond_broadcast (&pad->priv->activation_cond);
+      GST_OBJECT_UNLOCK (pad);
+
       /* ensures that streaming stops */
       GST_PAD_STREAM_LOCK (pad);
       GST_DEBUG_OBJECT (pad, "stopped streaming");
@@ -1035,6 +1052,10 @@ post_activate (GstPad * pad, GstPadMode new_mode)
       break;
     case GST_PAD_MODE_PUSH:
     case GST_PAD_MODE_PULL:
+      GST_OBJECT_LOCK (pad);
+      pad->priv->in_activation = FALSE;
+      g_cond_broadcast (&pad->priv->activation_cond);
+      GST_OBJECT_UNLOCK (pad);
       /* NOP */
       break;
   }
@@ -1256,6 +1277,8 @@ failure:
         active ? "activate" : "deactivate", gst_pad_mode_get_name (mode));
     GST_PAD_SET_FLUSHING (pad);
     GST_PAD_MODE (pad) = old;
+    pad->priv->in_activation = FALSE;
+    g_cond_broadcast (&pad->priv->activation_cond);
     GST_OBJECT_UNLOCK (pad);
     goto exit;
   }
@@ -1281,11 +1304,19 @@ gst_pad_activate_mode (GstPad * pad, GstPadMode mode, gboolean active)
 {
   GstObject *parent;
   gboolean res;
+  GstPadMode old, new;
 
   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
 
   GST_OBJECT_LOCK (pad);
+
+  old = GST_PAD_MODE (pad);
+  new = active ? mode : GST_PAD_MODE_NONE;
+  if (old == new)
+    goto was_ok;
+
   ACQUIRE_PARENT (pad, parent, no_parent);
+
   GST_OBJECT_UNLOCK (pad);
 
   res = activate_mode_internal (pad, parent, mode, active);
@@ -1294,6 +1325,13 @@ gst_pad_activate_mode (GstPad * pad, GstPadMode mode, gboolean active)
 
   return res;
 
+was_ok:
+  {
+    GST_OBJECT_UNLOCK (pad);
+    GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, "already %s in %s mode",
+        active ? "activated" : "deactivated", gst_pad_mode_get_name (mode));
+    return TRUE;
+  }
 no_parent:
   {
     GST_WARNING_OBJECT (pad, "no parent");
@@ -1331,6 +1369,9 @@ cleanup_hook (GstPad * pad, GHook * hook)
 {
   GstPadProbeType type;
 
+  GST_DEBUG_OBJECT (pad,
+      "cleaning up hook %lu with flags %08x", hook->hook_id, hook->flags);
+
   if (!G_HOOK_IS_VALID (hook))
     return;
 
@@ -1399,7 +1440,7 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask,
   GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "adding probe for mask 0x%08x",
       mask);
 
-  /* when no contraints are given for the types, assume all types are
+  /* when no constraints are given for the types, assume all types are
    * acceptable */
   if ((mask & _PAD_PROBE_TYPE_ALL_BOTH_AND_FLUSH) == 0)
     mask |= GST_PAD_PROBE_TYPE_ALL_BOTH;
@@ -1411,12 +1452,11 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask,
   hook->func = callback;
   hook->data = user_data;
   hook->destroy = destroy_data;
-  PROBE_COOKIE (hook) = (pad->priv->probe_cookie - 1);
 
   /* add the probe */
   g_hook_append (&pad->probes, hook);
   pad->num_probes++;
-  /* incremenent cookie so that the new hook get's called */
+  /* incremenent cookie so that the new hook gets called */
   pad->priv->probe_list_cookie++;
 
   /* get the id of the hook, we return this and it can be used to remove the
@@ -2042,6 +2082,10 @@ gst_pad_set_link_function_full (GstPad * pad, GstPadLinkFunction link,
  *
  * Sets the given unlink function for the pad. It will be called
  * when the pad is unlinked.
+ *
+ * Note that the pad's lock is already held when the unlink
+ * function is called, so most pad functions cannot be called
+ * from within the callback.
  */
 void
 gst_pad_set_unlink_function_full (GstPad * pad, GstPadUnlinkFunction unlink,
@@ -2719,7 +2763,7 @@ gst_pad_get_pad_template_caps (GstPad * pad)
  * Gets the peer of @pad. This function refs the peer pad so
  * you need to unref it after use.
  *
- * Returns: (transfer full): the peer #GstPad. Unref after usage.
+ * Returns: (transfer full) (nullable): the peer #GstPad. Unref after usage.
  *
  * MT safe.
  */
@@ -3202,7 +3246,6 @@ done:
 /* Default latency implementation */
 typedef struct
 {
-  guint count;
   gboolean live;
   GstClockTime min, max;
 } LatencyFoldData;
@@ -3234,8 +3277,7 @@ query_latency_default_fold (const GValue * item, GValue * ret,
     GST_LOG_OBJECT (pad, "got latency live:%s min:%" G_GINT64_FORMAT
         " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
 
-    /* FIXME : Why do we only take values into account if it's live ? */
-    if (live || fold_data->count == 0) {
+    if (live) {
       if (min > fold_data->min)
         fold_data->min = min;
 
@@ -3244,9 +3286,8 @@ query_latency_default_fold (const GValue * item, GValue * ret,
       else if (max < fold_data->max)
         fold_data->max = max;
 
-      fold_data->live = live;
+      fold_data->live = TRUE;
     }
-    fold_data->count += 1;
   } else if (peer) {
     GST_DEBUG_OBJECT (pad, "latency query failed");
     g_value_set_boolean (ret, FALSE);
@@ -3277,7 +3318,6 @@ gst_pad_query_latency_default (GstPad * pad, GstQuery * query)
   g_value_init (&ret, G_TYPE_BOOLEAN);
 
 retry:
-  fold_data.count = 0;
   fold_data.live = FALSE;
   fold_data.min = 0;
   fold_data.max = GST_CLOCK_TIME_NONE;
@@ -3379,6 +3419,10 @@ gst_pad_query_default (GstPad * pad, GstObject * parent, GstQuery * query)
       ret = gst_pad_query_latency_default (pad, query);
       forward = FALSE;
       break;
+    case GST_QUERY_BITRATE:
+      /* FIXME: better default handling */
+      forward = TRUE;
+      break;
     case GST_QUERY_POSITION:
     case GST_QUERY_SEEKING:
     case GST_QUERY_FORMATS:
@@ -3415,6 +3459,8 @@ gst_pad_query_default (GstPad * pad, GstObject * parent, GstQuery * query)
   return ret;
 }
 
+#define N_STACK_ALLOCATE_PROBES (16)
+
 static void
 probe_hook_marshal (GHook * hook, ProbeMarshall * data)
 {
@@ -3424,16 +3470,36 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
   GstPadProbeCallback callback;
   GstPadProbeReturn ret;
   gpointer original_data;
+  guint i;
 
-  /* if we have called this callback, do nothing */
-  if (PROBE_COOKIE (hook) == data->cookie) {
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-        "hook %lu, cookie %u already called", hook->hook_id,
-        PROBE_COOKIE (hook));
-    return;
+  /* if we have called this callback, do nothing. But only check
+   * if we're actually calling probes a second time */
+  if (data->retry) {
+    for (i = 0; i < data->n_called_probes; i++) {
+      if (data->called_probes[i] == hook->hook_id) {
+        GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+            "hook %lu already called", hook->hook_id);
+        return;
+      }
+    }
   }
 
-  PROBE_COOKIE (hook) = data->cookie;
+  /* reallocate on the heap if we had more than 16 probes */
+  if (data->n_called_probes == data->called_probes_size) {
+    if (data->called_probes_size > N_STACK_ALLOCATE_PROBES) {
+      data->called_probes_size *= 2;
+      data->called_probes =
+          g_renew (gulong, data->called_probes, data->called_probes_size);
+    } else {
+      gulong *tmp = data->called_probes;
+
+      data->called_probes_size *= 2;
+      data->called_probes = g_new (gulong, data->called_probes_size);
+      memcpy (data->called_probes, tmp,
+          N_STACK_ALLOCATE_PROBES * sizeof (gulong));
+    }
+  }
+  data->called_probes[data->n_called_probes++] = hook->hook_id;
 
   flags = hook->flags >> G_HOOK_FLAG_USER_SHIFT;
   type = info->type;
@@ -3481,8 +3547,7 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
     goto no_match;
 
   GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-      "hook %lu, cookie %u with flags 0x%08x matches", hook->hook_id,
-      PROBE_COOKIE (hook), flags);
+      "hook %lu with flags 0x%08x matches", hook->hook_id, flags);
 
   data->marshalled = TRUE;
 
@@ -3492,13 +3557,20 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
 
   info->id = hook->hook_id;
 
+  if ((flags & GST_PAD_PROBE_TYPE_IDLE))
+    pad->priv->idle_running++;
+
   GST_OBJECT_UNLOCK (pad);
 
   ret = callback (pad, info, hook->data);
 
   GST_OBJECT_LOCK (pad);
 
-  if (original_data != NULL && info->data == NULL) {
+  if ((flags & GST_PAD_PROBE_TYPE_IDLE))
+    pad->priv->idle_running--;
+
+  if (ret != GST_PAD_PROBE_HANDLED && original_data != NULL
+      && info->data == NULL) {
     GST_DEBUG_OBJECT (pad, "data item in pad probe info was dropped");
     info->type = GST_PAD_PROBE_TYPE_INVALID;
     data->dropped = TRUE;
@@ -3538,8 +3610,8 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
 no_match:
   {
     GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-        "hook %lu, cookie %u with flags 0x%08x does not match %08x",
-        hook->hook_id, PROBE_COOKIE (hook), flags, info->type);
+        "hook %lu with flags 0x%08x does not match %08x",
+        hook->hook_id, flags, info->type);
     return;
   }
 }
@@ -3624,6 +3696,7 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
   ProbeMarshall data;
   guint cookie;
   gboolean is_block;
+  gulong called_probes[N_STACK_ALLOCATE_PROBES];
 
   data.pad = pad;
   data.info = info;
@@ -3631,7 +3704,14 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
   data.handled = FALSE;
   data.marshalled = FALSE;
   data.dropped = FALSE;
-  data.cookie = ++pad->priv->probe_cookie;
+
+  /* We stack-allocate for N_STACK_ALLOCATE_PROBES hooks as a first step. If more are needed,
+   * we will re-allocate with g_malloc(). This should usually never be needed
+   */
+  data.called_probes = called_probes;
+  data.n_called_probes = 0;
+  data.called_probes_size = N_STACK_ALLOCATE_PROBES;
+  data.retry = FALSE;
 
   is_block =
       (info->type & GST_PAD_PROBE_TYPE_BLOCK) == GST_PAD_PROBE_TYPE_BLOCK;
@@ -3642,18 +3722,18 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
   }
 
 again:
-  GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-      "do probes cookie %u", data.cookie);
+  GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "do probes");
   cookie = pad->priv->probe_list_cookie;
 
   g_hook_list_marshal (&pad->probes, TRUE,
       (GHookMarshaller) probe_hook_marshal, &data);
 
-  /* if the list changed, call the new callbacks (they will not have their
-   * cookie set to data.cookie */
+  /* if the list changed, call the new callbacks (they will not be in
+   * called_probes yet) */
   if (cookie != pad->priv->probe_list_cookie) {
     GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
         "probe list changed, restarting");
+    data.retry = TRUE;
     goto again;
   }
 
@@ -3695,11 +3775,12 @@ again:
       GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_BLOCKING);
       GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "We got unblocked");
 
-      /* if the list changed, call the new callbacks (they will not have their
-       * cookie set to data.cookie */
+      /* if the list changed, call the new callbacks (they will not be in
+       * called_probes yet) */
       if (cookie != pad->priv->probe_list_cookie) {
         GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
             "probe list changed, restarting");
+        data.retry = TRUE;
         goto again;
       }
 
@@ -3708,28 +3789,39 @@ again:
     }
   }
 
+  if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+    g_free (data.called_probes);
+
   return defaultval;
 
   /* ERRORS */
 flushing:
   {
     GST_DEBUG_OBJECT (pad, "pad is flushing");
+    if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+      g_free (data.called_probes);
     return GST_FLOW_FLUSHING;
   }
 dropped:
   {
     GST_DEBUG_OBJECT (pad, "data is dropped");
+    if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+      g_free (data.called_probes);
     return GST_FLOW_CUSTOM_SUCCESS;
   }
 passed:
   {
     /* FIXME : Should we return FLOW_OK or the defaultval ?? */
     GST_DEBUG_OBJECT (pad, "data is passed");
+    if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+      g_free (data.called_probes);
     return GST_FLOW_OK;
   }
 handled:
   {
     GST_DEBUG_OBJECT (pad, "data was handled");
+    if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+      g_free (data.called_probes);
     return GST_FLOW_CUSTOM_SUCCESS_1;
   }
 }
@@ -3834,6 +3926,8 @@ push_sticky (GstPad * pad, PadEvent * ev, gpointer user_data)
   } else {
     data->ret = gst_pad_push_event_unchecked (pad, gst_event_ref (event),
         GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
+    if (data->ret == GST_FLOW_CUSTOM_SUCCESS_1)
+      data->ret = GST_FLOW_OK;
   }
 
   switch (data->ret) {
@@ -5210,6 +5304,7 @@ gst_pad_push_event_unchecked (GstPad * pad, GstEvent * event,
   GstFlowReturn ret;
   GstPad *peerpad;
   GstEventType event_type;
+  gint64 old_pad_offset = pad->offset;
 
   /* pass the adjusted event on. We need to do this even if
    * there is no peer pad because of the probes. */
@@ -5292,6 +5387,15 @@ gst_pad_push_event_unchecked (GstPad * pad, GstEvent * event,
     events_foreach (pad, sticky_changed, &data);
   }
 
+  /* the pad offset might've been changed by any of the probes above. It
+   * would've been taken into account when repushing any of the sticky events
+   * above but not for our current event here */
+  if (G_UNLIKELY (old_pad_offset != pad->offset)) {
+    event =
+        _apply_pad_offset (pad, event, GST_PAD_IS_SINK (pad),
+        pad->offset - old_pad_offset);
+  }
+
   /* now check the peer pad */
   peerpad = GST_PAD_PEER (pad);
   if (peerpad == NULL)
@@ -5533,9 +5637,11 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event,
   GstPadEventFunction eventfunc;
   GstPadEventFullFunction eventfullfunc = NULL;
   GstObject *parent;
+  gint64 old_pad_offset;
 
   GST_OBJECT_LOCK (pad);
 
+  old_pad_offset = pad->offset;
   event = apply_pad_offset (pad, event, GST_PAD_IS_SRC (pad));
 
   if (GST_PAD_IS_SINK (pad))
@@ -5631,6 +5737,15 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event,
 
   PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_PUSH, event, probe_stopped);
 
+  /* the pad offset might've been changed by any of the probes above. It
+   * would've been taken into account when repushing any of the sticky events
+   * above but not for our current event here */
+  if (G_UNLIKELY (old_pad_offset != pad->offset)) {
+    event =
+        _apply_pad_offset (pad, event, GST_PAD_IS_SRC (pad),
+        pad->offset - old_pad_offset);
+  }
+
   eventfullfunc = GST_PAD_EVENTFULLFUNC (pad);
   eventfunc = GST_PAD_EVENTFUNC (pad);
   if (G_UNLIKELY (eventfunc == NULL && eventfullfunc == NULL))
@@ -5863,7 +5978,7 @@ gst_pad_set_element_private (GstPad * pad, gpointer priv)
  * Gets the private data of a pad.
  * No locking is performed in this function.
  *
- * Returns: (transfer none): a #gpointer to the private data.
+ * Returns: (transfer none) (nullable): a #gpointer to the private data.
  */
 gpointer
 gst_pad_get_element_private (GstPad * pad)
@@ -6095,6 +6210,9 @@ gst_pad_pause_task (GstPad * pad)
   if (task == NULL)
     goto no_task;
   res = gst_task_set_state (task, GST_TASK_PAUSED);
+  /* unblock activation waits if any */
+  pad->priv->in_activation = FALSE;
+  g_cond_broadcast (&pad->priv->activation_cond);
   GST_OBJECT_UNLOCK (pad);
 
   /* wait for task function to finish, this lock is recursive so it does nothing
@@ -6180,6 +6298,9 @@ gst_pad_stop_task (GstPad * pad)
     goto no_task;
   GST_PAD_TASK (pad) = NULL;
   res = gst_task_set_state (task, GST_TASK_STOPPED);
+  /* unblock activation waits if any */
+  pad->priv->in_activation = FALSE;
+  g_cond_broadcast (&pad->priv->activation_cond);
   GST_OBJECT_UNLOCK (pad);
 
   GST_PAD_STREAM_LOCK (pad);
@@ -6223,7 +6344,7 @@ join_failed:
  * gst_pad_probe_info_get_event:
  * @info: a #GstPadProbeInfo
  *
- * Returns: (transfer none): The #GstEvent from the probe
+ * Returns: (transfer none) (nullable): The #GstEvent from the probe
  */
 
 GstEvent *
@@ -6240,7 +6361,7 @@ gst_pad_probe_info_get_event (GstPadProbeInfo * info)
  * gst_pad_probe_info_get_query:
  * @info: a #GstPadProbeInfo
  *
- * Returns: (transfer none): The #GstQuery from the probe
+ * Returns: (transfer none) (nullable): The #GstQuery from the probe
  */
 
 GstQuery *
@@ -6256,7 +6377,7 @@ gst_pad_probe_info_get_query (GstPadProbeInfo * info)
  * gst_pad_probe_info_get_buffer:
  * @info: a #GstPadProbeInfo
  *
- * Returns: (transfer none): The #GstBuffer from the probe
+ * Returns: (transfer none) (nullable): The #GstBuffer from the probe
  */
 
 GstBuffer *
@@ -6271,7 +6392,7 @@ gst_pad_probe_info_get_buffer (GstPadProbeInfo * info)
  * gst_pad_probe_info_get_buffer_list:
  * @info: a #GstPadProbeInfo
  *
- * Returns: (transfer none): The #GstBufferList from the probe
+ * Returns: (transfer none) (nullable): The #GstBufferList from the probe
  */
 
 GstBufferList *