pad: rework sticky events
authorWim Taymans <wim.taymans@collabora.co.uk>
Tue, 22 Nov 2011 17:32:51 +0000 (18:32 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Tue, 22 Nov 2011 17:40:49 +0000 (18:40 +0100)
Rewrite sticky events, trying to make it a bit more simple.
When sticky events are pushed on a srcpad, store them in the sticky event
array and mark the event with received = FALSE.
When the sticky event is successfully sent to the peer pad, make
received = TRUE.
Keep a PENDING_EVENTS pad flag that is set when one of the events is in
the received = FALSE state for some reason.
when activating a sinkpad, mark all events received = FALSE on the peer
srcpad.
When pushing a buffer, check the PENDING_EVENTS flag and if it is set, push all
events to the peer pad first.

gst/gstpad.c
gst/gstpad.h

index 4c5b2aa..79b454c 100644 (file)
@@ -101,7 +101,7 @@ enum
  * moved to the active event when the eventfunc returned TRUE. */
 typedef struct
 {
-  GstEvent *pending;
+  gboolean received;
   GstEvent *event;
 } PadEvent;
 
@@ -325,92 +325,66 @@ gst_pad_init (GstPad * pad)
 }
 
 static void
-clear_event (PadEvent events[], guint idx)
+clear_event (GstPad * pad, guint idx)
 {
-  gst_event_replace (&events[idx].event, NULL);
-  gst_event_replace (&events[idx].pending, NULL);
+  gst_event_replace (&pad->priv->events[idx].event, NULL);
+  pad->priv->events[idx].received = FALSE;
 }
 
 /* called when setting the pad inactive. It removes all sticky events from
  * the pad */
 static void
-clear_events (PadEvent events[])
+clear_events (GstPad * pad)
 {
   guint i;
 
   for (i = 0; i < GST_EVENT_MAX_STICKY; i++)
-    clear_event (events, i);
+    clear_event (pad, i);
+
+  GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_PENDING_EVENTS);
 }
 
-/* The sticky event with @idx from the srcpad is copied to the
- * pending event on the sinkpad (when different).
- * This function applies the pad offsets in case of segment events.
- * This will make sure that we send the event to the sinkpad event
- * function when the next buffer of event arrives.
- * Should be called with the OBJECT lock of both pads.
- * This function returns TRUE when there is a pending event on the
- * sinkpad */
 static gboolean
-replace_event (GstPad * srcpad, GstPad * sinkpad, guint idx)
+reschedule_event (GstPad * pad, guint idx)
 {
-  PadEvent *srcev, *sinkev;
-  GstEvent *event;
-  gboolean pending = FALSE;
-
-  srcev = &srcpad->priv->events[idx];
-
-  if ((event = srcev->event)) {
-    sinkev = &sinkpad->priv->events[idx];
-
-    switch (GST_EVENT_TYPE (event)) {
-      case GST_EVENT_SEGMENT:
-      {
-        GstSegment segment;
-        gint64 offset;
-
-        offset = srcpad->offset + sinkpad->offset;
-        if (offset != 0) {
-          gst_event_copy_segment (event, &segment);
-          /* adjust the base time. FIXME, check negative times, try to tweak the
-           * start to do clipping on negative times */
-          segment.base += offset;
-          /* make a new event from the updated segment */
-          event = gst_event_new_segment (&segment);
-        }
-        break;
-      }
-      default:
-        break;
-    }
-    if (sinkev->event != event) {
-      /* put in the pending entry when different */
-      GST_CAT_DEBUG_OBJECT (GST_CAT_SCHEDULING, srcpad,
-          "Putting event %p (%s) on pad %s:%s", event,
-          GST_EVENT_TYPE_NAME (event), GST_DEBUG_PAD_NAME (sinkpad));
-      gst_event_replace (&sinkev->pending, event);
-      pending = TRUE;
-    }
+  if (pad->priv->events[idx].event) {
+    pad->priv->events[idx].received = FALSE;
+    return TRUE;
   }
-  return pending;
+  return FALSE;
 }
 
-
-/* srcpad and sinkpad LOCK must be taken */
 static void
-prepare_event_update (GstPad * srcpad, GstPad * sinkpad)
+reschedule_events (GstPad * pad)
 {
-  gboolean pending;
-  gint i;
+  guint i;
+  gboolean pending = FALSE;
 
-  /* make sure we push the events from the source to this new peer, for this we
-   * copy the events on the sinkpad and mark EVENTS_PENDING */
-  pending = FALSE;
   for (i = 0; i < GST_EVENT_MAX_STICKY; i++)
-    pending |= replace_event (srcpad, sinkpad, i);
+    pending |= reschedule_event (pad, i);
 
-  /* we had some new pending events, set our flag */
   if (pending)
-    GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_NEED_EVENTS);
+    GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS);
+}
+
+
+/* should be called with LOCK */
+static GstEvent *
+apply_pad_offset (GstPad * pad, GstEvent * event)
+{
+  /* check if we need to adjust the segment */
+  if (pad->offset != 0) {
+    GstSegment segment;
+
+    /* copy segment values */
+    gst_event_copy_segment (event, &segment);
+    gst_event_unref (event);
+
+    /* adjust and make a new event with the offset applied */
+    segment.base += pad->offset;
+    event = gst_event_new_segment (&segment);
+  }
+  return event;
 }
 
 /* should be called with the OBJECT_LOCK */
@@ -453,7 +427,7 @@ gst_pad_dispose (GObject * object)
 
   gst_pad_set_pad_template (pad, NULL);
 
-  clear_events (pad->priv->events);
+  clear_events (pad);
 
   g_hook_list_clear (&pad->probes);
 
@@ -676,7 +650,27 @@ pre_activate (GstPad * pad, GstPadMode new_mode)
       GST_DEBUG_OBJECT (pad, "setting PAD_MODE %d, unset flushing", new_mode);
       GST_PAD_UNSET_FLUSHING (pad);
       GST_PAD_MODE (pad) = new_mode;
-      GST_OBJECT_UNLOCK (pad);
+      if (GST_PAD_IS_SINK (pad)) {
+        GstPad *peer;
+        /* make sure the peer src pad sends us all events */
+        if ((peer = GST_PAD_PEER (pad))) {
+          gst_object_ref (peer);
+          GST_OBJECT_UNLOCK (pad);
+
+          GST_DEBUG_OBJECT (pad, "reschedule events on peer %s:%s",
+              GST_DEBUG_PAD_NAME (peer));
+
+          GST_OBJECT_LOCK (peer);
+          reschedule_events (peer);
+          GST_OBJECT_UNLOCK (peer);
+
+          gst_object_unref (peer);
+        } else {
+          GST_OBJECT_UNLOCK (pad);
+        }
+      } else {
+        GST_OBJECT_UNLOCK (pad);
+      }
       break;
   }
 }
@@ -690,7 +684,7 @@ post_activate (GstPad * pad, GstPadMode new_mode)
       GST_PAD_STREAM_LOCK (pad);
       GST_DEBUG_OBJECT (pad, "stopped streaming");
       GST_OBJECT_LOCK (pad);
-      clear_events (pad->priv->events);
+      clear_events (pad);
       GST_OBJECT_UNLOCK (pad);
       GST_PAD_STREAM_UNLOCK (pad);
       break;
@@ -1477,7 +1471,6 @@ gst_pad_unlink (GstPad * srcpad, GstPad * sinkpad)
 {
   gboolean result = FALSE;
   GstElement *parent = NULL;
-  gint i;
 
   g_return_val_if_fail (GST_IS_PAD (srcpad), FALSE);
   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), FALSE);
@@ -1519,10 +1512,6 @@ gst_pad_unlink (GstPad * srcpad, GstPad * sinkpad)
   GST_PAD_PEER (srcpad) = NULL;
   GST_PAD_PEER (sinkpad) = NULL;
 
-  /* clear pending caps if any */
-  for (i = 0; i < GST_EVENT_MAX_STICKY; i++)
-    gst_event_replace (&sinkpad->priv->events[i].pending, NULL);
-
   GST_OBJECT_UNLOCK (sinkpad);
   GST_OBJECT_UNLOCK (srcpad);
 
@@ -1847,6 +1836,7 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags)
   GstPadLinkReturn result;
   GstElement *parent;
   GstPadLinkFunction srcfunc, sinkfunc;
+  guint idx;
 
   g_return_val_if_fail (GST_IS_PAD (srcpad), GST_PAD_LINK_REFUSED);
   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), GST_PAD_LINK_WRONG_DIRECTION);
@@ -1876,8 +1866,18 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags)
   GST_PAD_PEER (srcpad) = sinkpad;
   GST_PAD_PEER (sinkpad) = srcpad;
 
-  /* make sure we update events */
-  prepare_event_update (srcpad, sinkpad);
+  /* check events, when something is different, mark pending */
+  for (idx = 0; idx < GST_EVENT_MAX_STICKY; idx++) {
+    PadEvent *srcev, *sinkev;
+
+    srcev = &srcpad->priv->events[idx];
+    sinkev = &sinkpad->priv->events[idx];
+
+    if (srcev->event != sinkev->event) {
+      GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PENDING_EVENTS);
+      break;
+    }
+  }
 
   /* get the link functions */
   srcfunc = GST_PAD_LINKFUNC (srcpad);
@@ -2100,131 +2100,6 @@ gst_pad_set_caps (GstPad * pad, GstCaps * caps)
   return res;
 }
 
-static gboolean
-do_event_function (GstPad * pad, GstObject * parent, GstEvent * event,
-    GstPadEventFunction eventfunc, gboolean * caps_notify)
-{
-  gboolean result = TRUE, call_event = TRUE;
-  GstCaps *caps, *old, *templ;
-
-  switch (GST_EVENT_TYPE (event)) {
-    case GST_EVENT_CAPS:
-    {
-      /* backwards compatibility mode for caps */
-      gst_event_parse_caps (event, &caps);
-
-      /* See if pad accepts the caps */
-      templ = gst_pad_get_pad_template_caps (pad);
-      if (!gst_caps_is_subset (caps, templ))
-        goto not_accepted;
-
-      /* check if it changed */
-      if ((old = gst_pad_get_current_caps (pad))) {
-        call_event = !gst_caps_is_equal (caps, old);
-        gst_caps_unref (old);
-      }
-      if (call_event)
-        *caps_notify = TRUE;
-      gst_caps_unref (templ);
-      break;
-    }
-    default:
-      break;
-  }
-
-  if (call_event) {
-    GST_DEBUG_OBJECT (pad, "calling event function with event %p", event);
-    result = eventfunc (pad, parent, event);
-  } else {
-    gst_event_unref (event);
-  }
-  return result;
-
-  /* ERRORS */
-not_accepted:
-  {
-    gst_caps_unref (templ);
-    gst_event_unref (event);
-    GST_CAT_DEBUG_OBJECT (GST_CAT_CAPS, pad,
-        "caps %" GST_PTR_FORMAT " not accepted", caps);
-    return FALSE;
-  }
-}
-
-/* function to send all pending events on the sinkpad to the event
- * function and collect the results. This function should be called with
- * the object lock. The object lock might be released by this function.
- */
-static GstFlowReturn
-gst_pad_update_events (GstPad * pad)
-{
-  GstObject *parent;
-  GstFlowReturn ret = GST_FLOW_OK;
-  guint i;
-  GstPadEventFunction eventfunc;
-  GstEvent *event;
-  gboolean caps_notify = FALSE;
-  PadEvent *ev;
-
-  if (G_UNLIKELY ((eventfunc = GST_PAD_EVENTFUNC (pad)) == NULL))
-    goto no_function;
-
-  for (i = 0; i < GST_EVENT_MAX_STICKY; i++) {
-    gboolean res;
-
-    ev = &pad->priv->events[i];
-
-    /* skip without pending event */
-    if ((event = gst_event_steal (&ev->pending)) == NULL)
-      continue;
-
-    ACQUIRE_PARENT (pad, parent, no_parent);
-
-    gst_event_ref (event);
-    GST_OBJECT_UNLOCK (pad);
-
-    res = do_event_function (pad, parent, event, eventfunc, &caps_notify);
-
-    RELEASE_PARENT (parent);
-
-    /* things could have changed while we release the lock, check if we still
-     * are handling the same event, if we don't something changed and we have
-     * to try again. FIXME. we need a cookie here. FIXME, we also want to remove
-     * that lock eventually and then do the retry elsewhere. */
-
-    if (res) {
-      /* make the event active */
-      gst_event_take (&ev->event, event);
-
-      /* notify caps change when needed */
-      if (caps_notify) {
-        g_object_notify_by_pspec ((GObject *) pad, pspec_caps);
-        caps_notify = FALSE;
-      }
-    } else {
-      gst_event_unref (event);
-      ret = GST_FLOW_ERROR;
-    }
-    GST_OBJECT_LOCK (pad);
-  }
-  /* when we get here all events were successfully updated. */
-  return ret;
-
-  /* ERRORS */
-no_function:
-  {
-    g_warning ("pad %s:%s has no event handler, file a bug.",
-        GST_DEBUG_PAD_NAME (pad));
-    return GST_FLOW_NOT_SUPPORTED;
-  }
-no_parent:
-  {
-    GST_DEBUG_OBJECT (pad, "pad has no parent");
-    gst_event_take (&ev->pending, event);
-    return GST_FLOW_WRONG_STATE;
-  }
-}
-
 /**
  * gst_pad_get_pad_template_caps:
  * @pad: a #GstPad to get the template capabilities from.
@@ -3030,8 +2905,6 @@ void
 gst_pad_set_offset (GstPad * pad, gint64 offset)
 {
   guint idx;
-  GstPad *peer;
-  GstPad *tmp = NULL;
 
   g_return_if_fail (GST_IS_PAD (pad));
 
@@ -3041,42 +2914,21 @@ gst_pad_set_offset (GstPad * pad, gint64 offset)
     goto done;
 
   pad->offset = offset;
+  GST_DEBUG_OBJECT (pad, "changed offset to %" G_GINT64_FORMAT, offset);
 
-  /* if no peer, we just updated the offset */
-  if ((peer = GST_PAD_PEER (pad)) == NULL)
+  /* sinkpads will apply their offset the next time a segment
+   * event is received. */
+  if (GST_PAD_IS_SINK (pad))
     goto done;
 
-  /* switch pads around when dealing with a sinkpad */
-  if (GST_PAD_IS_SINK (pad)) {
-    /* ref the peer so it doesn't go away when we release the lock */
-    tmp = gst_object_ref (peer);
-    /* make sure we get the peer (the srcpad) */
-    GST_OBJECT_UNLOCK (pad);
-
-    /* swap pads */
-    peer = pad;
-    pad = tmp;
-
-    GST_OBJECT_LOCK (pad);
-    /* check if the pad didn't get relinked */
-    if (GST_PAD_PEER (pad) != peer)
-      goto done;
-
-    /* we can release the ref now */
-    gst_object_unref (peer);
-  }
-
   /* the index of the segment event in the array */
   idx = GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_SEGMENT);
 
-  /* lock order is srcpad >> sinkpad */
-  GST_OBJECT_LOCK (peer);
-  /* take the current segment event, adjust it and then place
-   * it on the sinkpad. events on the srcpad are always active. */
-  if (replace_event (pad, peer, idx))
-    GST_OBJECT_FLAG_SET (peer, GST_PAD_FLAG_NEED_EVENTS);
-
-  GST_OBJECT_UNLOCK (peer);
+  /* resend the last segment event on next buffer push */
+  if (pad->priv->events[idx].event) {
+    pad->priv->events[idx].received = FALSE;
+    GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS);
+  }
 
 done:
   GST_OBJECT_UNLOCK (pad);
@@ -3257,6 +3109,56 @@ probe_stopped:
  * Data passing functions
  */
 
+/* should be called with the LOCK */
+static inline GstFlowReturn
+gst_pad_push_sticky_events (GstPad * pad)
+{
+  GstFlowReturn ret;
+  gint i;
+
+restart:
+  ret = GST_FLOW_OK;
+
+  GST_DEBUG_OBJECT (pad, "pushing out all sticky events");
+  for (i = 0; i < GST_EVENT_MAX_STICKY; i++) {
+    gboolean res;
+    PadEvent *ev;
+    GstEvent *event;
+
+    ev = &pad->priv->events[i];
+
+    /* skip without active event */
+    if ((event = ev->event) == NULL)
+      continue;
+
+    if (ev->received) {
+      GST_DEBUG_OBJECT (pad, "event %s was already received",
+          GST_EVENT_TYPE_NAME (event));
+      continue;
+    }
+
+    gst_event_ref (event);
+    GST_OBJECT_UNLOCK (pad);
+
+    res = gst_pad_push_event (pad, event);
+
+    GST_OBJECT_LOCK (pad);
+    if (!res) {
+      ret = GST_FLOW_ERROR;
+      break;
+    }
+    /* things could have changed while we release the lock, check if we still
+     * are handling the same event, if we don't something changed and we have
+     * to try again. FIXME. we need a cookie here. */
+    if (event != ev->event) {
+      GST_DEBUG_OBJECT (pad, "events changed, restarting");
+      goto restart;
+    }
+  }
+
+  return ret;
+}
+
 /* this is the chain function that does not perform the additional argument
  * checking for that little extra speed.
  */
@@ -3265,7 +3167,6 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data)
 {
   GstFlowReturn ret;
   GstObject *parent;
-  gboolean needs_events;
 
   GST_PAD_STREAM_LOCK (pad);
 
@@ -3273,16 +3174,6 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data)
   if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
     goto flushing;
 
-  needs_events = GST_PAD_NEEDS_EVENTS (pad);
-  if (G_UNLIKELY (needs_events)) {
-    GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_NEED_EVENTS);
-
-    GST_DEBUG_OBJECT (pad, "need to update all events");
-    ret = gst_pad_update_events (pad);
-    if (G_UNLIKELY (ret != GST_FLOW_OK))
-      goto events_error;
-  }
-
   PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped);
 
   PROBE_PUSH (pad, type, data, probe_stopped);
@@ -3341,14 +3232,6 @@ flushing:
     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
     return GST_FLOW_WRONG_STATE;
   }
-events_error:
-  {
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "events were not accepted");
-    GST_OBJECT_UNLOCK (pad);
-    GST_PAD_STREAM_UNLOCK (pad);
-    gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
-    return ret;
-  }
 probe_stopped:
   {
     GST_OBJECT_UNLOCK (pad);
@@ -3491,6 +3374,13 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data)
   if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
     goto flushing;
 
+  if (G_UNLIKELY (GST_PAD_HAS_PENDING_EVENTS (pad))) {
+    GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_PENDING_EVENTS);
+
+    if ((ret = gst_pad_push_sticky_events (pad)))
+      goto events_error;
+  }
+
   /* do block probes */
   PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped);
 
@@ -3530,6 +3420,13 @@ flushing:
     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
     return GST_FLOW_WRONG_STATE;
   }
+events_error:
+  {
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "error pushing events");
+    GST_OBJECT_UNLOCK (pad);
+    gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+    return ret;
+  }
 probe_stopped:
   {
     GST_OBJECT_UNLOCK (pad);
@@ -3640,6 +3537,13 @@ gst_pad_get_range_unchecked (GstPad * pad, guint64 offset, guint size,
   if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
     goto flushing;
 
+  if (G_UNLIKELY (GST_PAD_HAS_PENDING_EVENTS (pad))) {
+    GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_PENDING_EVENTS);
+
+    if ((ret = gst_pad_push_sticky_events (pad)))
+      goto events_error;
+  }
+
   /* when one of the probes returns a buffer, probed_data will be called and we
    * skip calling the getrange function */
   PROBE_PRE_PULL (pad, GST_PAD_PROBE_TYPE_PULL | GST_PAD_PROBE_TYPE_BLOCK,
@@ -3683,6 +3587,13 @@ flushing:
     GST_PAD_STREAM_UNLOCK (pad);
     return GST_FLOW_WRONG_STATE;
   }
+events_error:
+  {
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "error pushing events");
+    GST_OBJECT_UNLOCK (pad);
+    GST_PAD_STREAM_UNLOCK (pad);
+    return ret;
+  }
 no_parent:
   {
     GST_DEBUG_OBJECT (pad, "no parent");
@@ -3793,7 +3704,6 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
 {
   GstPad *peer;
   GstFlowReturn ret;
-  gboolean needs_events;
 
   g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_PAD_IS_SINK (pad), GST_FLOW_ERROR);
@@ -3834,15 +3744,6 @@ probed_data:
   PROBE_PULL (pad, GST_PAD_PROBE_TYPE_PULL | GST_PAD_PROBE_TYPE_BUFFER,
       *buffer, offset, size, post_probe_stopped);
 
-  needs_events = GST_PAD_NEEDS_EVENTS (pad);
-  if (G_UNLIKELY (needs_events)) {
-    GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_NEED_EVENTS);
-
-    GST_DEBUG_OBJECT (pad, "we need to update the events");
-    ret = gst_pad_update_events (pad);
-    if (G_UNLIKELY (ret != GST_FLOW_OK))
-      goto events_error;
-  }
   GST_OBJECT_UNLOCK (pad);
 
   return ret;
@@ -3888,15 +3789,37 @@ post_probe_stopped:
     *buffer = NULL;
     return ret;
   }
-events_error:
-  {
-    GST_OBJECT_UNLOCK (pad);
-    gst_buffer_unref (*buffer);
-    *buffer = NULL;
-    GST_CAT_WARNING_OBJECT (GST_CAT_SCHEDULING, pad,
-        "pullrange returned events that were not accepted");
-    return ret;
+}
+
+static gboolean
+gst_pad_store_sticky_event (GstPad * pad, GstEvent * event, gboolean locked)
+{
+  guint idx;
+  gboolean ret;
+
+  idx = GST_EVENT_STICKY_IDX (event);
+
+  ret = gst_event_replace (&pad->priv->events[idx].event, event);
+
+  if (ret) {
+    GST_LOG_OBJECT (pad, "stored sticky event %s at index %u",
+        GST_EVENT_TYPE_NAME (event), idx);
+    switch (GST_EVENT_TYPE (event)) {
+      case GST_EVENT_CAPS:
+        if (locked)
+          GST_OBJECT_UNLOCK (pad);
+
+        GST_DEBUG_OBJECT (pad, "notify caps");
+        g_object_notify_by_pspec ((GObject *) pad, pspec_caps);
+
+        if (locked)
+          GST_OBJECT_LOCK (pad);
+        break;
+      default:
+        break;
+    }
   }
+  return ret;
 }
 
 /**
@@ -3923,18 +3846,29 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
   gboolean result;
   gboolean stored = FALSE;
   GstPadProbeType type;
+  guint idx = 0;
+  gboolean sticky;
 
   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
   g_return_val_if_fail (event != NULL, FALSE);
   g_return_val_if_fail (GST_IS_EVENT (event), FALSE);
 
-  if (GST_EVENT_IS_DOWNSTREAM (event))
+  if (GST_PAD_IS_SRC (pad)) {
+    if (G_UNLIKELY (!GST_EVENT_IS_DOWNSTREAM (event)))
+      goto wrong_direction;
+    if ((sticky = GST_EVENT_IS_STICKY (event)))
+      idx = GST_EVENT_STICKY_IDX (event);
     type = GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM;
-  else
+  } else if (GST_PAD_IS_SINK (pad)) {
+    if (G_UNLIKELY (!GST_EVENT_IS_UPSTREAM (event)))
+      goto wrong_direction;
+    /* events pushed on sinkpad never are sticky */
+    sticky = FALSE;
     type = GST_PAD_PROBE_TYPE_EVENT_UPSTREAM;
+  } else
+    goto unknown_direction;
 
   GST_OBJECT_LOCK (pad);
-
   peerpad = GST_PAD_PEER (pad);
 
   /* Two checks to be made:
@@ -3959,8 +3893,7 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
 
       /* Remove sticky EOS events */
       GST_LOG_OBJECT (pad, "Removing pending EOS events");
-      clear_event (pad->priv->events,
-          GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_EOS));
+      clear_event (pad, GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_EOS));
 
       if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
         GST_LOG_OBJECT (pad, "Pad is blocked, not forwarding flush-stop");
@@ -3972,53 +3905,29 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
       if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
         goto flushed;
 
-      /* store the event on the pad, but only on srcpads */
-      if (GST_PAD_IS_SRC (pad) && GST_EVENT_IS_STICKY (event)) {
-        guint idx;
-
-        idx = GST_EVENT_STICKY_IDX (event);
-        GST_LOG_OBJECT (pad, "storing sticky event %s at index %u",
-            GST_EVENT_TYPE_NAME (event), idx);
-
-        /* srcpad sticky events always become active immediately */
-        gst_event_replace (&pad->priv->events[idx].event, event);
-
+      /* store the event on the pad, but only on srcpads. We always store the
+       * event exactly how it was sent */
+      if (sticky) {
+        /* srcpad sticky events are store immediately, we set the pending flag
+         * to TRUE, it will be set to FALSE when we can successfully push the
+         * event to the peer pad */
+        if (gst_pad_store_sticky_event (pad, event, TRUE)) {
+          GST_DEBUG_OBJECT (pad, "event %s updated",
+              GST_EVENT_TYPE_NAME (event));
+          pad->priv->events[idx].received = FALSE;
+        }
+        /* the peerpad might have changed. Things we checked above could not
+         * have changed. */
+        peerpad = GST_PAD_PEER (pad);
         stored = TRUE;
       }
 
-      /* backwards compatibility mode for caps */
       switch (GST_EVENT_TYPE (event)) {
-        case GST_EVENT_CAPS:
-        {
-          GST_OBJECT_UNLOCK (pad);
-
-          g_object_notify_by_pspec ((GObject *) pad, pspec_caps);
-
-          GST_OBJECT_LOCK (pad);
-          /* the peerpad might have changed. Things we checked above could not
-           * have changed. */
-          peerpad = GST_PAD_PEER (pad);
-          break;
-        }
         case GST_EVENT_SEGMENT:
-        {
-          gint64 offset;
-
-          offset = pad->offset;
-          /* check if we need to adjust the segment */
-          if (offset != 0 && (peerpad != NULL)) {
-            GstSegment segment;
-
-            /* copy segment values */
-            gst_event_copy_segment (event, &segment);
-            gst_event_unref (event);
-
-            /* adjust and make a new event with the offset applied */
-            segment.base += offset;
-            event = gst_event_new_segment (&segment);
-          }
+          /* pass the adjusted segment event on. We need to do this even if
+           * there is no peer pad because of the probes. */
+          event = apply_pad_offset (pad, event);
           break;
-        }
         case GST_EVENT_RECONFIGURE:
           if (GST_PAD_IS_SINK (pad))
             GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_NEED_RECONFIGURE);
@@ -4026,10 +3935,8 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
         default:
           break;
       }
-
       PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_PUSH |
           GST_PAD_PROBE_TYPE_BLOCK, event, probe_stopped);
-
       break;
     }
   }
@@ -4059,17 +3966,39 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
   gst_object_unref (peerpad);
 
   GST_OBJECT_LOCK (pad);
+  if (sticky) {
+    if (result) {
+      pad->priv->events[idx].received = TRUE;
+      GST_DEBUG_OBJECT (pad, "event cleared pending");
+    } else {
+      GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS);
+      GST_DEBUG_OBJECT (pad, "mark pending events");
+    }
+  }
   pad->priv->using--;
   if (pad->priv->using == 0) {
     /* pad is not active anymore, trigger idle callbacks */
     PROBE_NO_DATA (pad, GST_PAD_PROBE_TYPE_PUSH | GST_PAD_PROBE_TYPE_IDLE,
-        probe_stopped, GST_FLOW_OK);
+        idle_probe_stopped, GST_FLOW_OK);
   }
   GST_OBJECT_UNLOCK (pad);
 
   return result | stored;
 
   /* ERROR handling */
+wrong_direction:
+  {
+    g_warning ("pad %s:%s pushing %s event in wrong direction",
+        GST_DEBUG_PAD_NAME (pad), GST_EVENT_TYPE_NAME (event));
+    gst_event_unref (event);
+    return FALSE;
+  }
+unknown_direction:
+  {
+    g_warning ("pad %s:%s has invalid direction", GST_DEBUG_PAD_NAME (pad));
+    gst_event_unref (event);
+    return FALSE;
+  }
 flushed:
   {
     GST_DEBUG_OBJECT (pad, "We're flushing");
@@ -4080,6 +4009,14 @@ flushed:
 probe_stopped:
   {
     GST_DEBUG_OBJECT (pad, "Probe returned %s", gst_flow_get_name (ret));
+    GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS);
+    GST_OBJECT_UNLOCK (pad);
+    gst_event_unref (event);
+    return stored;
+  }
+idle_probe_stopped:
+  {
+    GST_DEBUG_OBJECT (pad, "Idle probe returned %s", gst_flow_get_name (ret));
     GST_OBJECT_UNLOCK (pad);
     gst_event_unref (event);
     return stored;
@@ -4087,12 +4024,48 @@ probe_stopped:
 not_linked:
   {
     GST_DEBUG_OBJECT (pad, "Dropping event because pad is not linked");
+    GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS);
     GST_OBJECT_UNLOCK (pad);
     gst_event_unref (event);
     return stored;
   }
 }
 
+/* Check if we can call the event function with the given event */
+static GstFlowReturn
+pre_eventfunc_check (GstPad * pad, GstEvent * event)
+{
+  GstCaps *caps, *templ;
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_CAPS:
+    {
+      /* backwards compatibility mode for caps */
+      gst_event_parse_caps (event, &caps);
+
+      /* See if pad accepts the caps */
+      templ = gst_pad_get_pad_template_caps (pad);
+      if (!gst_caps_is_subset (caps, templ))
+        goto not_accepted;
+
+      gst_caps_unref (templ);
+      break;
+    }
+    default:
+      break;
+  }
+  return GST_FLOW_OK;
+
+  /* ERRORS */
+not_accepted:
+  {
+    gst_caps_unref (templ);
+    GST_CAT_DEBUG_OBJECT (GST_CAT_CAPS, pad,
+        "caps %" GST_PTR_FORMAT " not accepted", caps);
+    return GST_FLOW_NOT_NEGOTIATED;
+  }
+}
+
 /**
  * gst_pad_send_event:
  * @pad: a #GstPad to send the event to.
@@ -4126,13 +4099,14 @@ gst_pad_send_event (GstPad * pad, GstEvent * event)
 {
   GstFlowReturn ret;
   gboolean result = FALSE;
-  gboolean serialized, need_unlock = FALSE, needs_events, sticky;
+  gboolean serialized, need_unlock = FALSE, sticky;
+  GstPadEventFunction eventfunc;
+  GstObject *parent;
   GstPadProbeType type;
 
   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
   g_return_val_if_fail (event != NULL, FALSE);
 
-  GST_OBJECT_LOCK (pad);
   if (GST_PAD_IS_SINK (pad)) {
     if (G_UNLIKELY (!GST_EVENT_IS_DOWNSTREAM (event)))
       goto wrong_direction;
@@ -4148,10 +4122,7 @@ gst_pad_send_event (GstPad * pad, GstEvent * event)
   } else
     goto unknown_direction;
 
-  /* get the flag first, we clear it when we have a FLUSH or a non-serialized
-   * event. */
-  needs_events = GST_PAD_NEEDS_EVENTS (pad);
-
+  GST_OBJECT_LOCK (pad);
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
       GST_CAT_DEBUG_OBJECT (GST_CAT_EVENT, pad,
@@ -4163,7 +4134,6 @@ gst_pad_send_event (GstPad * pad, GstEvent * event)
 
       GST_PAD_SET_FLUSHING (pad);
       GST_CAT_DEBUG_OBJECT (GST_CAT_EVENT, pad, "set flush flag");
-      needs_events = FALSE;
       break;
     case GST_EVENT_FLUSH_STOP:
       if (G_LIKELY (GST_PAD_MODE (pad) != GST_PAD_MODE_NONE)) {
@@ -4172,15 +4142,15 @@ gst_pad_send_event (GstPad * pad, GstEvent * event)
       }
       /* Remove pending EOS events */
       GST_LOG_OBJECT (pad, "Removing pending EOS events");
-      clear_event (pad->priv->events,
-          GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_EOS));
+      clear_event (pad, GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_EOS));
 
       GST_OBJECT_UNLOCK (pad);
       /* grab stream lock */
       GST_PAD_STREAM_LOCK (pad);
       need_unlock = TRUE;
       GST_OBJECT_LOCK (pad);
-      needs_events = FALSE;
+      if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
+        goto flushing;
       break;
     case GST_EVENT_RECONFIGURE:
       if (GST_PAD_IS_SRC (pad))
@@ -4198,89 +4168,50 @@ gst_pad_send_event (GstPad * pad, GstEvent * event)
         GST_PAD_STREAM_LOCK (pad);
         need_unlock = TRUE;
         GST_OBJECT_LOCK (pad);
-      } else {
-        /* don't forward events on non-serialized events */
-        needs_events = FALSE;
+        if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
+          goto flushing;
       }
 
-      /* store the event on the pad, but only on srcpads. We need to store the
-       * event before checking the flushing flag. */
-      if (sticky) {
-        guint idx;
-        PadEvent *ev;
-
-        switch (GST_EVENT_TYPE (event)) {
-          case GST_EVENT_SEGMENT:
-            if (pad->offset != 0) {
-              GstSegment segment;
-
-              /* copy segment values */
-              gst_event_copy_segment (event, &segment);
-              gst_event_unref (event);
-
-              /* adjust and make a new event with the offset applied */
-              segment.base += pad->offset;
-              event = gst_event_new_segment (&segment);
-            }
-            break;
-          default:
-            break;
-        }
-
-        idx = GST_EVENT_STICKY_IDX (event);
-        ev = &pad->priv->events[idx];
-
-        if (ev->event != event) {
-          GST_LOG_OBJECT (pad, "storing sticky event %s at index %u",
-              GST_EVENT_TYPE_NAME (event), idx);
-          gst_event_replace (&ev->pending, event);
-          /* set the flag so that we update the events next time. We would
-           * usually update below but we might be flushing too. */
-          GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_NEED_EVENTS);
-          needs_events = TRUE;
-        }
+      switch (GST_EVENT_TYPE (event)) {
+        case GST_EVENT_SEGMENT:
+          event = apply_pad_offset (pad, event);
+          break;
+        default:
+          break;
       }
+
       /* now do the probe */
       PROBE_PUSH (pad,
           type | GST_PAD_PROBE_TYPE_PUSH |
           GST_PAD_PROBE_TYPE_BLOCK, event, probe_stopped);
 
       PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_PUSH, event, probe_stopped);
-
       break;
   }
 
-  if (G_UNLIKELY (needs_events)) {
-    GstFlowReturn ret;
-
-    GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_NEED_EVENTS);
-
-    GST_DEBUG_OBJECT (pad, "need to update all events");
-    ret = gst_pad_update_events (pad);
-    if (ret != GST_FLOW_OK)
-      goto update_failed;
-    GST_OBJECT_UNLOCK (pad);
+  if (G_UNLIKELY ((eventfunc = GST_PAD_EVENTFUNC (pad)) == NULL))
+    goto no_function;
 
-    gst_event_unref (event);
+  ACQUIRE_PARENT (pad, parent, no_parent);
+  GST_OBJECT_UNLOCK (pad);
 
-    result = TRUE;
-  }
+  if (G_UNLIKELY (pre_eventfunc_check (pad, event) != GST_FLOW_OK))
+    goto precheck_failed;
 
-  /* ensure to pass on event;
-   * note that a sticky event has already been updated above */
-  if (G_LIKELY (!needs_events || !sticky)) {
-    GstPadEventFunction eventfunc;
-    GstObject *parent;
+  if (sticky)
+    gst_event_ref (event);
 
-    if (G_UNLIKELY ((eventfunc = GST_PAD_EVENTFUNC (pad)) == NULL))
-      goto no_function;
+  result = eventfunc (pad, parent, event);
 
-    ACQUIRE_PARENT (pad, parent, no_parent);
-    GST_OBJECT_UNLOCK (pad);
+  RELEASE_PARENT (parent);
 
-    result = eventfunc (pad, parent, event);
+  if (sticky) {
+    if (result)
+      /* after the event function accepted the event, we can store the sticky
+       * event on the pad */
+      gst_pad_store_sticky_event (pad, event, FALSE);
 
-    RELEASE_PARENT (parent);
+    gst_event_unref (event);
   }
 
   if (need_unlock)
@@ -4295,14 +4226,12 @@ wrong_direction:
   {
     g_warning ("pad %s:%s sending %s event in wrong direction",
         GST_DEBUG_PAD_NAME (pad), GST_EVENT_TYPE_NAME (event));
-    GST_OBJECT_UNLOCK (pad);
     gst_event_unref (event);
     return FALSE;
   }
 unknown_direction:
   {
     g_warning ("pad %s:%s has invalid direction", GST_DEBUG_PAD_NAME (pad));
-    GST_OBJECT_UNLOCK (pad);
     gst_event_unref (event);
     return FALSE;
   }
@@ -4316,6 +4245,15 @@ no_function:
     gst_event_unref (event);
     return FALSE;
   }
+precheck_failed:
+  {
+    GST_DEBUG_OBJECT (pad, "pre event check failed");
+    RELEASE_PARENT (parent);
+    if (need_unlock)
+      GST_PAD_STREAM_UNLOCK (pad);
+    gst_event_unref (event);
+    return FALSE;
+  }
 no_parent:
   {
     GST_DEBUG_OBJECT (pad, "no parent");
@@ -4344,19 +4282,8 @@ probe_stopped:
     gst_event_unref (event);
     return FALSE;
   }
-update_failed:
-  {
-    GST_OBJECT_UNLOCK (pad);
-    if (need_unlock)
-      GST_PAD_STREAM_UNLOCK (pad);
-    GST_CAT_INFO_OBJECT (GST_CAT_EVENT, pad, "Update events failed");
-    gst_event_unref (event);
-    return FALSE;
-  }
 }
 
-
-
 /**
  * gst_pad_set_element_private:
  * @pad: the #GstPad to set the private data of.
index 178948c..7ee870d 100644 (file)
@@ -578,7 +578,7 @@ typedef GstFlowReturn  (*GstPadStickyEventsForeachFunction) (GstPad *pad, GstEve
  *                            The flag has to be unset manually after
  *                            reconfiguration happened.
  *                            Since: 0.10.34.
- * @GST_PAD_FLAG_NEED_EVENTS: the pad has pending events
+ * @GST_PAD_FLAG_PENDING_EVENTS: the pad has pending events
  * @GST_PAD_FLAG_FIXED_CAPS: the pad is using fixed caps this means that once the
  *                      caps are set on the pad, the caps query function only
  *                      returns those caps.
@@ -596,7 +596,7 @@ typedef enum {
   GST_PAD_FLAG_FLUSHING         = (GST_OBJECT_FLAG_LAST << 1),
   GST_PAD_FLAG_BLOCKING         = (GST_OBJECT_FLAG_LAST << 2),
   GST_PAD_FLAG_NEED_RECONFIGURE = (GST_OBJECT_FLAG_LAST << 3),
-  GST_PAD_FLAG_NEED_EVENTS      = (GST_OBJECT_FLAG_LAST << 4),
+  GST_PAD_FLAG_PENDING_EVENTS   = (GST_OBJECT_FLAG_LAST << 4),
   GST_PAD_FLAG_FIXED_CAPS       = (GST_OBJECT_FLAG_LAST << 5),
   GST_PAD_FLAG_PROXY_CAPS       = (GST_OBJECT_FLAG_LAST << 6),
   GST_PAD_FLAG_NEED_PARENT      = (GST_OBJECT_FLAG_LAST << 7),
@@ -725,7 +725,7 @@ struct _GstPadClass {
 #define GST_PAD_UNSET_FLUSHING(pad)    (GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_FLUSHING))
 
 #define GST_PAD_NEEDS_RECONFIGURE(pad)  (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_NEED_RECONFIGURE))
-#define GST_PAD_NEEDS_EVENTS(pad)       (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_NEED_EVENTS))
+#define GST_PAD_HAS_PENDING_EVENTS(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_PENDING_EVENTS))
 #define GST_PAD_IS_FIXED_CAPS(pad)     (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_FIXED_CAPS))
 #define GST_PAD_NEEDS_PARENT(pad)       (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_NEED_PARENT))