From dd65aae9a177f7b11dcef0f690a78d698f667cd4 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 22 Nov 2011 18:32:51 +0100 Subject: [PATCH] pad: rework sticky events Rewrite sticky events, trying to make it a bit more simple. When sticky events are pushed on a srcpad, store them in the sticky event array and mark the event with received = FALSE. When the sticky event is successfully sent to the peer pad, make received = TRUE. Keep a PENDING_EVENTS pad flag that is set when one of the events is in the received = FALSE state for some reason. when activating a sinkpad, mark all events received = FALSE on the peer srcpad. When pushing a buffer, check the PENDING_EVENTS flag and if it is set, push all events to the peer pad first. --- gst/gstpad.c | 735 +++++++++++++++++++++++++++-------------------------------- gst/gstpad.h | 6 +- 2 files changed, 334 insertions(+), 407 deletions(-) diff --git a/gst/gstpad.c b/gst/gstpad.c index 4c5b2aa..79b454c 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -101,7 +101,7 @@ enum * moved to the active event when the eventfunc returned TRUE. */ typedef struct { - GstEvent *pending; + gboolean received; GstEvent *event; } PadEvent; @@ -325,92 +325,66 @@ gst_pad_init (GstPad * pad) } static void -clear_event (PadEvent events[], guint idx) +clear_event (GstPad * pad, guint idx) { - gst_event_replace (&events[idx].event, NULL); - gst_event_replace (&events[idx].pending, NULL); + gst_event_replace (&pad->priv->events[idx].event, NULL); + pad->priv->events[idx].received = FALSE; } /* called when setting the pad inactive. It removes all sticky events from * the pad */ static void -clear_events (PadEvent events[]) +clear_events (GstPad * pad) { guint i; for (i = 0; i < GST_EVENT_MAX_STICKY; i++) - clear_event (events, i); + clear_event (pad, i); + + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_PENDING_EVENTS); } -/* The sticky event with @idx from the srcpad is copied to the - * pending event on the sinkpad (when different). - * This function applies the pad offsets in case of segment events. - * This will make sure that we send the event to the sinkpad event - * function when the next buffer of event arrives. - * Should be called with the OBJECT lock of both pads. - * This function returns TRUE when there is a pending event on the - * sinkpad */ static gboolean -replace_event (GstPad * srcpad, GstPad * sinkpad, guint idx) +reschedule_event (GstPad * pad, guint idx) { - PadEvent *srcev, *sinkev; - GstEvent *event; - gboolean pending = FALSE; - - srcev = &srcpad->priv->events[idx]; - - if ((event = srcev->event)) { - sinkev = &sinkpad->priv->events[idx]; - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_SEGMENT: - { - GstSegment segment; - gint64 offset; - - offset = srcpad->offset + sinkpad->offset; - if (offset != 0) { - gst_event_copy_segment (event, &segment); - /* adjust the base time. FIXME, check negative times, try to tweak the - * start to do clipping on negative times */ - segment.base += offset; - /* make a new event from the updated segment */ - event = gst_event_new_segment (&segment); - } - break; - } - default: - break; - } - if (sinkev->event != event) { - /* put in the pending entry when different */ - GST_CAT_DEBUG_OBJECT (GST_CAT_SCHEDULING, srcpad, - "Putting event %p (%s) on pad %s:%s", event, - GST_EVENT_TYPE_NAME (event), GST_DEBUG_PAD_NAME (sinkpad)); - gst_event_replace (&sinkev->pending, event); - pending = TRUE; - } + if (pad->priv->events[idx].event) { + pad->priv->events[idx].received = FALSE; + return TRUE; } - return pending; + return FALSE; } - -/* srcpad and sinkpad LOCK must be taken */ static void -prepare_event_update (GstPad * srcpad, GstPad * sinkpad) +reschedule_events (GstPad * pad) { - gboolean pending; - gint i; + guint i; + gboolean pending = FALSE; - /* make sure we push the events from the source to this new peer, for this we - * copy the events on the sinkpad and mark EVENTS_PENDING */ - pending = FALSE; for (i = 0; i < GST_EVENT_MAX_STICKY; i++) - pending |= replace_event (srcpad, sinkpad, i); + pending |= reschedule_event (pad, i); - /* we had some new pending events, set our flag */ if (pending) - GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_NEED_EVENTS); + GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS); +} + + +/* should be called with LOCK */ +static GstEvent * +apply_pad_offset (GstPad * pad, GstEvent * event) +{ + /* check if we need to adjust the segment */ + if (pad->offset != 0) { + GstSegment segment; + + /* copy segment values */ + gst_event_copy_segment (event, &segment); + gst_event_unref (event); + + /* adjust and make a new event with the offset applied */ + segment.base += pad->offset; + event = gst_event_new_segment (&segment); + } + return event; } /* should be called with the OBJECT_LOCK */ @@ -453,7 +427,7 @@ gst_pad_dispose (GObject * object) gst_pad_set_pad_template (pad, NULL); - clear_events (pad->priv->events); + clear_events (pad); g_hook_list_clear (&pad->probes); @@ -676,7 +650,27 @@ pre_activate (GstPad * pad, GstPadMode new_mode) GST_DEBUG_OBJECT (pad, "setting PAD_MODE %d, unset flushing", new_mode); GST_PAD_UNSET_FLUSHING (pad); GST_PAD_MODE (pad) = new_mode; - GST_OBJECT_UNLOCK (pad); + if (GST_PAD_IS_SINK (pad)) { + GstPad *peer; + /* make sure the peer src pad sends us all events */ + if ((peer = GST_PAD_PEER (pad))) { + gst_object_ref (peer); + GST_OBJECT_UNLOCK (pad); + + GST_DEBUG_OBJECT (pad, "reschedule events on peer %s:%s", + GST_DEBUG_PAD_NAME (peer)); + + GST_OBJECT_LOCK (peer); + reschedule_events (peer); + GST_OBJECT_UNLOCK (peer); + + gst_object_unref (peer); + } else { + GST_OBJECT_UNLOCK (pad); + } + } else { + GST_OBJECT_UNLOCK (pad); + } break; } } @@ -690,7 +684,7 @@ post_activate (GstPad * pad, GstPadMode new_mode) GST_PAD_STREAM_LOCK (pad); GST_DEBUG_OBJECT (pad, "stopped streaming"); GST_OBJECT_LOCK (pad); - clear_events (pad->priv->events); + clear_events (pad); GST_OBJECT_UNLOCK (pad); GST_PAD_STREAM_UNLOCK (pad); break; @@ -1477,7 +1471,6 @@ gst_pad_unlink (GstPad * srcpad, GstPad * sinkpad) { gboolean result = FALSE; GstElement *parent = NULL; - gint i; g_return_val_if_fail (GST_IS_PAD (srcpad), FALSE); g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), FALSE); @@ -1519,10 +1512,6 @@ gst_pad_unlink (GstPad * srcpad, GstPad * sinkpad) GST_PAD_PEER (srcpad) = NULL; GST_PAD_PEER (sinkpad) = NULL; - /* clear pending caps if any */ - for (i = 0; i < GST_EVENT_MAX_STICKY; i++) - gst_event_replace (&sinkpad->priv->events[i].pending, NULL); - GST_OBJECT_UNLOCK (sinkpad); GST_OBJECT_UNLOCK (srcpad); @@ -1847,6 +1836,7 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags) GstPadLinkReturn result; GstElement *parent; GstPadLinkFunction srcfunc, sinkfunc; + guint idx; g_return_val_if_fail (GST_IS_PAD (srcpad), GST_PAD_LINK_REFUSED); g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), GST_PAD_LINK_WRONG_DIRECTION); @@ -1876,8 +1866,18 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags) GST_PAD_PEER (srcpad) = sinkpad; GST_PAD_PEER (sinkpad) = srcpad; - /* make sure we update events */ - prepare_event_update (srcpad, sinkpad); + /* check events, when something is different, mark pending */ + for (idx = 0; idx < GST_EVENT_MAX_STICKY; idx++) { + PadEvent *srcev, *sinkev; + + srcev = &srcpad->priv->events[idx]; + sinkev = &sinkpad->priv->events[idx]; + + if (srcev->event != sinkev->event) { + GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PENDING_EVENTS); + break; + } + } /* get the link functions */ srcfunc = GST_PAD_LINKFUNC (srcpad); @@ -2100,131 +2100,6 @@ gst_pad_set_caps (GstPad * pad, GstCaps * caps) return res; } -static gboolean -do_event_function (GstPad * pad, GstObject * parent, GstEvent * event, - GstPadEventFunction eventfunc, gboolean * caps_notify) -{ - gboolean result = TRUE, call_event = TRUE; - GstCaps *caps, *old, *templ; - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_CAPS: - { - /* backwards compatibility mode for caps */ - gst_event_parse_caps (event, &caps); - - /* See if pad accepts the caps */ - templ = gst_pad_get_pad_template_caps (pad); - if (!gst_caps_is_subset (caps, templ)) - goto not_accepted; - - /* check if it changed */ - if ((old = gst_pad_get_current_caps (pad))) { - call_event = !gst_caps_is_equal (caps, old); - gst_caps_unref (old); - } - if (call_event) - *caps_notify = TRUE; - gst_caps_unref (templ); - break; - } - default: - break; - } - - if (call_event) { - GST_DEBUG_OBJECT (pad, "calling event function with event %p", event); - result = eventfunc (pad, parent, event); - } else { - gst_event_unref (event); - } - return result; - - /* ERRORS */ -not_accepted: - { - gst_caps_unref (templ); - gst_event_unref (event); - GST_CAT_DEBUG_OBJECT (GST_CAT_CAPS, pad, - "caps %" GST_PTR_FORMAT " not accepted", caps); - return FALSE; - } -} - -/* function to send all pending events on the sinkpad to the event - * function and collect the results. This function should be called with - * the object lock. The object lock might be released by this function. - */ -static GstFlowReturn -gst_pad_update_events (GstPad * pad) -{ - GstObject *parent; - GstFlowReturn ret = GST_FLOW_OK; - guint i; - GstPadEventFunction eventfunc; - GstEvent *event; - gboolean caps_notify = FALSE; - PadEvent *ev; - - if (G_UNLIKELY ((eventfunc = GST_PAD_EVENTFUNC (pad)) == NULL)) - goto no_function; - - for (i = 0; i < GST_EVENT_MAX_STICKY; i++) { - gboolean res; - - ev = &pad->priv->events[i]; - - /* skip without pending event */ - if ((event = gst_event_steal (&ev->pending)) == NULL) - continue; - - ACQUIRE_PARENT (pad, parent, no_parent); - - gst_event_ref (event); - GST_OBJECT_UNLOCK (pad); - - res = do_event_function (pad, parent, event, eventfunc, &caps_notify); - - RELEASE_PARENT (parent); - - /* things could have changed while we release the lock, check if we still - * are handling the same event, if we don't something changed and we have - * to try again. FIXME. we need a cookie here. FIXME, we also want to remove - * that lock eventually and then do the retry elsewhere. */ - - if (res) { - /* make the event active */ - gst_event_take (&ev->event, event); - - /* notify caps change when needed */ - if (caps_notify) { - g_object_notify_by_pspec ((GObject *) pad, pspec_caps); - caps_notify = FALSE; - } - } else { - gst_event_unref (event); - ret = GST_FLOW_ERROR; - } - GST_OBJECT_LOCK (pad); - } - /* when we get here all events were successfully updated. */ - return ret; - - /* ERRORS */ -no_function: - { - g_warning ("pad %s:%s has no event handler, file a bug.", - GST_DEBUG_PAD_NAME (pad)); - return GST_FLOW_NOT_SUPPORTED; - } -no_parent: - { - GST_DEBUG_OBJECT (pad, "pad has no parent"); - gst_event_take (&ev->pending, event); - return GST_FLOW_WRONG_STATE; - } -} - /** * gst_pad_get_pad_template_caps: * @pad: a #GstPad to get the template capabilities from. @@ -3030,8 +2905,6 @@ void gst_pad_set_offset (GstPad * pad, gint64 offset) { guint idx; - GstPad *peer; - GstPad *tmp = NULL; g_return_if_fail (GST_IS_PAD (pad)); @@ -3041,42 +2914,21 @@ gst_pad_set_offset (GstPad * pad, gint64 offset) goto done; pad->offset = offset; + GST_DEBUG_OBJECT (pad, "changed offset to %" G_GINT64_FORMAT, offset); - /* if no peer, we just updated the offset */ - if ((peer = GST_PAD_PEER (pad)) == NULL) + /* sinkpads will apply their offset the next time a segment + * event is received. */ + if (GST_PAD_IS_SINK (pad)) goto done; - /* switch pads around when dealing with a sinkpad */ - if (GST_PAD_IS_SINK (pad)) { - /* ref the peer so it doesn't go away when we release the lock */ - tmp = gst_object_ref (peer); - /* make sure we get the peer (the srcpad) */ - GST_OBJECT_UNLOCK (pad); - - /* swap pads */ - peer = pad; - pad = tmp; - - GST_OBJECT_LOCK (pad); - /* check if the pad didn't get relinked */ - if (GST_PAD_PEER (pad) != peer) - goto done; - - /* we can release the ref now */ - gst_object_unref (peer); - } - /* the index of the segment event in the array */ idx = GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_SEGMENT); - /* lock order is srcpad >> sinkpad */ - GST_OBJECT_LOCK (peer); - /* take the current segment event, adjust it and then place - * it on the sinkpad. events on the srcpad are always active. */ - if (replace_event (pad, peer, idx)) - GST_OBJECT_FLAG_SET (peer, GST_PAD_FLAG_NEED_EVENTS); - - GST_OBJECT_UNLOCK (peer); + /* resend the last segment event on next buffer push */ + if (pad->priv->events[idx].event) { + pad->priv->events[idx].received = FALSE; + GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS); + } done: GST_OBJECT_UNLOCK (pad); @@ -3257,6 +3109,56 @@ probe_stopped: * Data passing functions */ +/* should be called with the LOCK */ +static inline GstFlowReturn +gst_pad_push_sticky_events (GstPad * pad) +{ + GstFlowReturn ret; + gint i; + +restart: + ret = GST_FLOW_OK; + + GST_DEBUG_OBJECT (pad, "pushing out all sticky events"); + for (i = 0; i < GST_EVENT_MAX_STICKY; i++) { + gboolean res; + PadEvent *ev; + GstEvent *event; + + ev = &pad->priv->events[i]; + + /* skip without active event */ + if ((event = ev->event) == NULL) + continue; + + if (ev->received) { + GST_DEBUG_OBJECT (pad, "event %s was already received", + GST_EVENT_TYPE_NAME (event)); + continue; + } + + gst_event_ref (event); + GST_OBJECT_UNLOCK (pad); + + res = gst_pad_push_event (pad, event); + + GST_OBJECT_LOCK (pad); + if (!res) { + ret = GST_FLOW_ERROR; + break; + } + /* things could have changed while we release the lock, check if we still + * are handling the same event, if we don't something changed and we have + * to try again. FIXME. we need a cookie here. */ + if (event != ev->event) { + GST_DEBUG_OBJECT (pad, "events changed, restarting"); + goto restart; + } + } + + return ret; +} + /* this is the chain function that does not perform the additional argument * checking for that little extra speed. */ @@ -3265,7 +3167,6 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data) { GstFlowReturn ret; GstObject *parent; - gboolean needs_events; GST_PAD_STREAM_LOCK (pad); @@ -3273,16 +3174,6 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data) if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) goto flushing; - needs_events = GST_PAD_NEEDS_EVENTS (pad); - if (G_UNLIKELY (needs_events)) { - GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_NEED_EVENTS); - - GST_DEBUG_OBJECT (pad, "need to update all events"); - ret = gst_pad_update_events (pad); - if (G_UNLIKELY (ret != GST_FLOW_OK)) - goto events_error; - } - PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped); PROBE_PUSH (pad, type, data, probe_stopped); @@ -3341,14 +3232,6 @@ flushing: gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); return GST_FLOW_WRONG_STATE; } -events_error: - { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "events were not accepted"); - GST_OBJECT_UNLOCK (pad); - GST_PAD_STREAM_UNLOCK (pad); - gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); - return ret; - } probe_stopped: { GST_OBJECT_UNLOCK (pad); @@ -3491,6 +3374,13 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data) if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) goto flushing; + if (G_UNLIKELY (GST_PAD_HAS_PENDING_EVENTS (pad))) { + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_PENDING_EVENTS); + + if ((ret = gst_pad_push_sticky_events (pad))) + goto events_error; + } + /* do block probes */ PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped); @@ -3530,6 +3420,13 @@ flushing: gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); return GST_FLOW_WRONG_STATE; } +events_error: + { + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "error pushing events"); + GST_OBJECT_UNLOCK (pad); + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + return ret; + } probe_stopped: { GST_OBJECT_UNLOCK (pad); @@ -3640,6 +3537,13 @@ gst_pad_get_range_unchecked (GstPad * pad, guint64 offset, guint size, if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) goto flushing; + if (G_UNLIKELY (GST_PAD_HAS_PENDING_EVENTS (pad))) { + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_PENDING_EVENTS); + + if ((ret = gst_pad_push_sticky_events (pad))) + goto events_error; + } + /* when one of the probes returns a buffer, probed_data will be called and we * skip calling the getrange function */ PROBE_PRE_PULL (pad, GST_PAD_PROBE_TYPE_PULL | GST_PAD_PROBE_TYPE_BLOCK, @@ -3683,6 +3587,13 @@ flushing: GST_PAD_STREAM_UNLOCK (pad); return GST_FLOW_WRONG_STATE; } +events_error: + { + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "error pushing events"); + GST_OBJECT_UNLOCK (pad); + GST_PAD_STREAM_UNLOCK (pad); + return ret; + } no_parent: { GST_DEBUG_OBJECT (pad, "no parent"); @@ -3793,7 +3704,6 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size, { GstPad *peer; GstFlowReturn ret; - gboolean needs_events; g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR); g_return_val_if_fail (GST_PAD_IS_SINK (pad), GST_FLOW_ERROR); @@ -3834,15 +3744,6 @@ probed_data: PROBE_PULL (pad, GST_PAD_PROBE_TYPE_PULL | GST_PAD_PROBE_TYPE_BUFFER, *buffer, offset, size, post_probe_stopped); - needs_events = GST_PAD_NEEDS_EVENTS (pad); - if (G_UNLIKELY (needs_events)) { - GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_NEED_EVENTS); - - GST_DEBUG_OBJECT (pad, "we need to update the events"); - ret = gst_pad_update_events (pad); - if (G_UNLIKELY (ret != GST_FLOW_OK)) - goto events_error; - } GST_OBJECT_UNLOCK (pad); return ret; @@ -3888,15 +3789,37 @@ post_probe_stopped: *buffer = NULL; return ret; } -events_error: - { - GST_OBJECT_UNLOCK (pad); - gst_buffer_unref (*buffer); - *buffer = NULL; - GST_CAT_WARNING_OBJECT (GST_CAT_SCHEDULING, pad, - "pullrange returned events that were not accepted"); - return ret; +} + +static gboolean +gst_pad_store_sticky_event (GstPad * pad, GstEvent * event, gboolean locked) +{ + guint idx; + gboolean ret; + + idx = GST_EVENT_STICKY_IDX (event); + + ret = gst_event_replace (&pad->priv->events[idx].event, event); + + if (ret) { + GST_LOG_OBJECT (pad, "stored sticky event %s at index %u", + GST_EVENT_TYPE_NAME (event), idx); + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CAPS: + if (locked) + GST_OBJECT_UNLOCK (pad); + + GST_DEBUG_OBJECT (pad, "notify caps"); + g_object_notify_by_pspec ((GObject *) pad, pspec_caps); + + if (locked) + GST_OBJECT_LOCK (pad); + break; + default: + break; + } } + return ret; } /** @@ -3923,18 +3846,29 @@ gst_pad_push_event (GstPad * pad, GstEvent * event) gboolean result; gboolean stored = FALSE; GstPadProbeType type; + guint idx = 0; + gboolean sticky; g_return_val_if_fail (GST_IS_PAD (pad), FALSE); g_return_val_if_fail (event != NULL, FALSE); g_return_val_if_fail (GST_IS_EVENT (event), FALSE); - if (GST_EVENT_IS_DOWNSTREAM (event)) + if (GST_PAD_IS_SRC (pad)) { + if (G_UNLIKELY (!GST_EVENT_IS_DOWNSTREAM (event))) + goto wrong_direction; + if ((sticky = GST_EVENT_IS_STICKY (event))) + idx = GST_EVENT_STICKY_IDX (event); type = GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM; - else + } else if (GST_PAD_IS_SINK (pad)) { + if (G_UNLIKELY (!GST_EVENT_IS_UPSTREAM (event))) + goto wrong_direction; + /* events pushed on sinkpad never are sticky */ + sticky = FALSE; type = GST_PAD_PROBE_TYPE_EVENT_UPSTREAM; + } else + goto unknown_direction; GST_OBJECT_LOCK (pad); - peerpad = GST_PAD_PEER (pad); /* Two checks to be made: @@ -3959,8 +3893,7 @@ gst_pad_push_event (GstPad * pad, GstEvent * event) /* Remove sticky EOS events */ GST_LOG_OBJECT (pad, "Removing pending EOS events"); - clear_event (pad->priv->events, - GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_EOS)); + clear_event (pad, GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_EOS)); if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) { GST_LOG_OBJECT (pad, "Pad is blocked, not forwarding flush-stop"); @@ -3972,53 +3905,29 @@ gst_pad_push_event (GstPad * pad, GstEvent * event) if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) goto flushed; - /* store the event on the pad, but only on srcpads */ - if (GST_PAD_IS_SRC (pad) && GST_EVENT_IS_STICKY (event)) { - guint idx; - - idx = GST_EVENT_STICKY_IDX (event); - GST_LOG_OBJECT (pad, "storing sticky event %s at index %u", - GST_EVENT_TYPE_NAME (event), idx); - - /* srcpad sticky events always become active immediately */ - gst_event_replace (&pad->priv->events[idx].event, event); - + /* store the event on the pad, but only on srcpads. We always store the + * event exactly how it was sent */ + if (sticky) { + /* srcpad sticky events are store immediately, we set the pending flag + * to TRUE, it will be set to FALSE when we can successfully push the + * event to the peer pad */ + if (gst_pad_store_sticky_event (pad, event, TRUE)) { + GST_DEBUG_OBJECT (pad, "event %s updated", + GST_EVENT_TYPE_NAME (event)); + pad->priv->events[idx].received = FALSE; + } + /* the peerpad might have changed. Things we checked above could not + * have changed. */ + peerpad = GST_PAD_PEER (pad); stored = TRUE; } - /* backwards compatibility mode for caps */ switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_CAPS: - { - GST_OBJECT_UNLOCK (pad); - - g_object_notify_by_pspec ((GObject *) pad, pspec_caps); - - GST_OBJECT_LOCK (pad); - /* the peerpad might have changed. Things we checked above could not - * have changed. */ - peerpad = GST_PAD_PEER (pad); - break; - } case GST_EVENT_SEGMENT: - { - gint64 offset; - - offset = pad->offset; - /* check if we need to adjust the segment */ - if (offset != 0 && (peerpad != NULL)) { - GstSegment segment; - - /* copy segment values */ - gst_event_copy_segment (event, &segment); - gst_event_unref (event); - - /* adjust and make a new event with the offset applied */ - segment.base += offset; - event = gst_event_new_segment (&segment); - } + /* pass the adjusted segment event on. We need to do this even if + * there is no peer pad because of the probes. */ + event = apply_pad_offset (pad, event); break; - } case GST_EVENT_RECONFIGURE: if (GST_PAD_IS_SINK (pad)) GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_NEED_RECONFIGURE); @@ -4026,10 +3935,8 @@ gst_pad_push_event (GstPad * pad, GstEvent * event) default: break; } - PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_PUSH | GST_PAD_PROBE_TYPE_BLOCK, event, probe_stopped); - break; } } @@ -4059,17 +3966,39 @@ gst_pad_push_event (GstPad * pad, GstEvent * event) gst_object_unref (peerpad); GST_OBJECT_LOCK (pad); + if (sticky) { + if (result) { + pad->priv->events[idx].received = TRUE; + GST_DEBUG_OBJECT (pad, "event cleared pending"); + } else { + GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS); + GST_DEBUG_OBJECT (pad, "mark pending events"); + } + } pad->priv->using--; if (pad->priv->using == 0) { /* pad is not active anymore, trigger idle callbacks */ PROBE_NO_DATA (pad, GST_PAD_PROBE_TYPE_PUSH | GST_PAD_PROBE_TYPE_IDLE, - probe_stopped, GST_FLOW_OK); + idle_probe_stopped, GST_FLOW_OK); } GST_OBJECT_UNLOCK (pad); return result | stored; /* ERROR handling */ +wrong_direction: + { + g_warning ("pad %s:%s pushing %s event in wrong direction", + GST_DEBUG_PAD_NAME (pad), GST_EVENT_TYPE_NAME (event)); + gst_event_unref (event); + return FALSE; + } +unknown_direction: + { + g_warning ("pad %s:%s has invalid direction", GST_DEBUG_PAD_NAME (pad)); + gst_event_unref (event); + return FALSE; + } flushed: { GST_DEBUG_OBJECT (pad, "We're flushing"); @@ -4080,6 +4009,14 @@ flushed: probe_stopped: { GST_DEBUG_OBJECT (pad, "Probe returned %s", gst_flow_get_name (ret)); + GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS); + GST_OBJECT_UNLOCK (pad); + gst_event_unref (event); + return stored; + } +idle_probe_stopped: + { + GST_DEBUG_OBJECT (pad, "Idle probe returned %s", gst_flow_get_name (ret)); GST_OBJECT_UNLOCK (pad); gst_event_unref (event); return stored; @@ -4087,12 +4024,48 @@ probe_stopped: not_linked: { GST_DEBUG_OBJECT (pad, "Dropping event because pad is not linked"); + GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS); GST_OBJECT_UNLOCK (pad); gst_event_unref (event); return stored; } } +/* Check if we can call the event function with the given event */ +static GstFlowReturn +pre_eventfunc_check (GstPad * pad, GstEvent * event) +{ + GstCaps *caps, *templ; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CAPS: + { + /* backwards compatibility mode for caps */ + gst_event_parse_caps (event, &caps); + + /* See if pad accepts the caps */ + templ = gst_pad_get_pad_template_caps (pad); + if (!gst_caps_is_subset (caps, templ)) + goto not_accepted; + + gst_caps_unref (templ); + break; + } + default: + break; + } + return GST_FLOW_OK; + + /* ERRORS */ +not_accepted: + { + gst_caps_unref (templ); + GST_CAT_DEBUG_OBJECT (GST_CAT_CAPS, pad, + "caps %" GST_PTR_FORMAT " not accepted", caps); + return GST_FLOW_NOT_NEGOTIATED; + } +} + /** * gst_pad_send_event: * @pad: a #GstPad to send the event to. @@ -4126,13 +4099,14 @@ gst_pad_send_event (GstPad * pad, GstEvent * event) { GstFlowReturn ret; gboolean result = FALSE; - gboolean serialized, need_unlock = FALSE, needs_events, sticky; + gboolean serialized, need_unlock = FALSE, sticky; + GstPadEventFunction eventfunc; + GstObject *parent; GstPadProbeType type; g_return_val_if_fail (GST_IS_PAD (pad), FALSE); g_return_val_if_fail (event != NULL, FALSE); - GST_OBJECT_LOCK (pad); if (GST_PAD_IS_SINK (pad)) { if (G_UNLIKELY (!GST_EVENT_IS_DOWNSTREAM (event))) goto wrong_direction; @@ -4148,10 +4122,7 @@ gst_pad_send_event (GstPad * pad, GstEvent * event) } else goto unknown_direction; - /* get the flag first, we clear it when we have a FLUSH or a non-serialized - * event. */ - needs_events = GST_PAD_NEEDS_EVENTS (pad); - + GST_OBJECT_LOCK (pad); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: GST_CAT_DEBUG_OBJECT (GST_CAT_EVENT, pad, @@ -4163,7 +4134,6 @@ gst_pad_send_event (GstPad * pad, GstEvent * event) GST_PAD_SET_FLUSHING (pad); GST_CAT_DEBUG_OBJECT (GST_CAT_EVENT, pad, "set flush flag"); - needs_events = FALSE; break; case GST_EVENT_FLUSH_STOP: if (G_LIKELY (GST_PAD_MODE (pad) != GST_PAD_MODE_NONE)) { @@ -4172,15 +4142,15 @@ gst_pad_send_event (GstPad * pad, GstEvent * event) } /* Remove pending EOS events */ GST_LOG_OBJECT (pad, "Removing pending EOS events"); - clear_event (pad->priv->events, - GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_EOS)); + clear_event (pad, GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_EOS)); GST_OBJECT_UNLOCK (pad); /* grab stream lock */ GST_PAD_STREAM_LOCK (pad); need_unlock = TRUE; GST_OBJECT_LOCK (pad); - needs_events = FALSE; + if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) + goto flushing; break; case GST_EVENT_RECONFIGURE: if (GST_PAD_IS_SRC (pad)) @@ -4198,89 +4168,50 @@ gst_pad_send_event (GstPad * pad, GstEvent * event) GST_PAD_STREAM_LOCK (pad); need_unlock = TRUE; GST_OBJECT_LOCK (pad); - } else { - /* don't forward events on non-serialized events */ - needs_events = FALSE; + if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) + goto flushing; } - /* store the event on the pad, but only on srcpads. We need to store the - * event before checking the flushing flag. */ - if (sticky) { - guint idx; - PadEvent *ev; - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_SEGMENT: - if (pad->offset != 0) { - GstSegment segment; - - /* copy segment values */ - gst_event_copy_segment (event, &segment); - gst_event_unref (event); - - /* adjust and make a new event with the offset applied */ - segment.base += pad->offset; - event = gst_event_new_segment (&segment); - } - break; - default: - break; - } - - idx = GST_EVENT_STICKY_IDX (event); - ev = &pad->priv->events[idx]; - - if (ev->event != event) { - GST_LOG_OBJECT (pad, "storing sticky event %s at index %u", - GST_EVENT_TYPE_NAME (event), idx); - gst_event_replace (&ev->pending, event); - /* set the flag so that we update the events next time. We would - * usually update below but we might be flushing too. */ - GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_NEED_EVENTS); - needs_events = TRUE; - } + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEGMENT: + event = apply_pad_offset (pad, event); + break; + default: + break; } + /* now do the probe */ PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_PUSH | GST_PAD_PROBE_TYPE_BLOCK, event, probe_stopped); PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_PUSH, event, probe_stopped); - break; } - if (G_UNLIKELY (needs_events)) { - GstFlowReturn ret; - - GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_NEED_EVENTS); - - GST_DEBUG_OBJECT (pad, "need to update all events"); - ret = gst_pad_update_events (pad); - if (ret != GST_FLOW_OK) - goto update_failed; - GST_OBJECT_UNLOCK (pad); + if (G_UNLIKELY ((eventfunc = GST_PAD_EVENTFUNC (pad)) == NULL)) + goto no_function; - gst_event_unref (event); + ACQUIRE_PARENT (pad, parent, no_parent); + GST_OBJECT_UNLOCK (pad); - result = TRUE; - } + if (G_UNLIKELY (pre_eventfunc_check (pad, event) != GST_FLOW_OK)) + goto precheck_failed; - /* ensure to pass on event; - * note that a sticky event has already been updated above */ - if (G_LIKELY (!needs_events || !sticky)) { - GstPadEventFunction eventfunc; - GstObject *parent; + if (sticky) + gst_event_ref (event); - if (G_UNLIKELY ((eventfunc = GST_PAD_EVENTFUNC (pad)) == NULL)) - goto no_function; + result = eventfunc (pad, parent, event); - ACQUIRE_PARENT (pad, parent, no_parent); - GST_OBJECT_UNLOCK (pad); + RELEASE_PARENT (parent); - result = eventfunc (pad, parent, event); + if (sticky) { + if (result) + /* after the event function accepted the event, we can store the sticky + * event on the pad */ + gst_pad_store_sticky_event (pad, event, FALSE); - RELEASE_PARENT (parent); + gst_event_unref (event); } if (need_unlock) @@ -4295,14 +4226,12 @@ wrong_direction: { g_warning ("pad %s:%s sending %s event in wrong direction", GST_DEBUG_PAD_NAME (pad), GST_EVENT_TYPE_NAME (event)); - GST_OBJECT_UNLOCK (pad); gst_event_unref (event); return FALSE; } unknown_direction: { g_warning ("pad %s:%s has invalid direction", GST_DEBUG_PAD_NAME (pad)); - GST_OBJECT_UNLOCK (pad); gst_event_unref (event); return FALSE; } @@ -4316,6 +4245,15 @@ no_function: gst_event_unref (event); return FALSE; } +precheck_failed: + { + GST_DEBUG_OBJECT (pad, "pre event check failed"); + RELEASE_PARENT (parent); + if (need_unlock) + GST_PAD_STREAM_UNLOCK (pad); + gst_event_unref (event); + return FALSE; + } no_parent: { GST_DEBUG_OBJECT (pad, "no parent"); @@ -4344,19 +4282,8 @@ probe_stopped: gst_event_unref (event); return FALSE; } -update_failed: - { - GST_OBJECT_UNLOCK (pad); - if (need_unlock) - GST_PAD_STREAM_UNLOCK (pad); - GST_CAT_INFO_OBJECT (GST_CAT_EVENT, pad, "Update events failed"); - gst_event_unref (event); - return FALSE; - } } - - /** * gst_pad_set_element_private: * @pad: the #GstPad to set the private data of. diff --git a/gst/gstpad.h b/gst/gstpad.h index 178948c..7ee870d 100644 --- a/gst/gstpad.h +++ b/gst/gstpad.h @@ -578,7 +578,7 @@ typedef GstFlowReturn (*GstPadStickyEventsForeachFunction) (GstPad *pad, GstEve * The flag has to be unset manually after * reconfiguration happened. * Since: 0.10.34. - * @GST_PAD_FLAG_NEED_EVENTS: the pad has pending events + * @GST_PAD_FLAG_PENDING_EVENTS: the pad has pending events * @GST_PAD_FLAG_FIXED_CAPS: the pad is using fixed caps this means that once the * caps are set on the pad, the caps query function only * returns those caps. @@ -596,7 +596,7 @@ typedef enum { GST_PAD_FLAG_FLUSHING = (GST_OBJECT_FLAG_LAST << 1), GST_PAD_FLAG_BLOCKING = (GST_OBJECT_FLAG_LAST << 2), GST_PAD_FLAG_NEED_RECONFIGURE = (GST_OBJECT_FLAG_LAST << 3), - GST_PAD_FLAG_NEED_EVENTS = (GST_OBJECT_FLAG_LAST << 4), + GST_PAD_FLAG_PENDING_EVENTS = (GST_OBJECT_FLAG_LAST << 4), GST_PAD_FLAG_FIXED_CAPS = (GST_OBJECT_FLAG_LAST << 5), GST_PAD_FLAG_PROXY_CAPS = (GST_OBJECT_FLAG_LAST << 6), GST_PAD_FLAG_NEED_PARENT = (GST_OBJECT_FLAG_LAST << 7), @@ -725,7 +725,7 @@ struct _GstPadClass { #define GST_PAD_UNSET_FLUSHING(pad) (GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_FLUSHING)) #define GST_PAD_NEEDS_RECONFIGURE(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_NEED_RECONFIGURE)) -#define GST_PAD_NEEDS_EVENTS(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_NEED_EVENTS)) +#define GST_PAD_HAS_PENDING_EVENTS(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_PENDING_EVENTS)) #define GST_PAD_IS_FIXED_CAPS(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_FIXED_CAPS)) #define GST_PAD_NEEDS_PARENT(pad) (GST_OBJECT_FLAG_IS_SET (pad, GST_PAD_FLAG_NEED_PARENT)) -- 2.7.4