X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Fgstpad.c;h=12eb561f0d6106887c183171e3fb76b6b82c05bc;hb=dc5a62f70234f729c0f3443ab725e9f0232cc1fe;hp=dfa1eeabbe41c6ca514e54ba3feacf3571500828;hpb=08167e3234a3fb277bccb2de6370ff14e218616d;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst/gstpad.c b/gst/gstpad.c index dfa1eea..12eb561 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -21,6 +21,7 @@ */ /** * SECTION:gstpad + * @title: GstPad * @short_description: Object contained by elements that allows links to * other elements * @see_also: #GstPadTemplate, #GstElement, #GstEvent, #GstQuery, #GstBuffer @@ -94,6 +95,7 @@ #include "gstutils.h" #include "gstinfo.h" #include "gsterror.h" +#include "gsttracerutils.h" #include "gstvalue.h" #include "glib-compat-private.h" @@ -119,8 +121,7 @@ enum /* FILL ME */ }; -#define GST_PAD_GET_PRIVATE(obj) \ - (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_PAD, GstPadPrivate)) +#define _PAD_PROBE_TYPE_ALL_BOTH_AND_FLUSH (GST_PAD_PROBE_TYPE_ALL_BOTH | GST_PAD_PROBE_TYPE_EVENT_FLUSH) /* we have a pending and an active event on the pad. On source pads only the * active event is used. On sinkpads, events are copied to the pending entry and @@ -139,16 +140,25 @@ struct _GstPadPrivate gint using; guint probe_list_cookie; - guint probe_cookie; + + /* counter of how many idle probes are running directly from the add_probe + * call. Used to block any data flowing in the pad while the idle callback + * Doesn't finish its work */ + gint idle_running; + + /* conditional and variable used to ensure pads only get (de)activated + * by a single thread at a time. Protected by the object lock */ + GCond activation_cond; + gboolean in_activation; }; typedef struct { GHook hook; - guint cookie; } GstProbe; -#define PROBE_COOKIE(h) (((GstProbe *)(h))->cookie) +#define GST_PAD_IS_RUNNING_IDLE_PROBE(p) \ + (((GstPad *)(p))->priv->idle_running > 0) typedef struct { @@ -156,8 +166,13 @@ typedef struct GstPadProbeInfo *info; gboolean dropped; gboolean pass; + gboolean handled; gboolean marshalled; - guint cookie; + + gulong *called_probes; + guint n_called_probes; + guint called_probes_size; + gboolean retry; } ProbeMarshall; static void gst_pad_dispose (GObject * object); @@ -177,6 +192,9 @@ static GstFlowReturn gst_pad_send_event_unchecked (GstPad * pad, static GstFlowReturn gst_pad_push_event_unchecked (GstPad * pad, GstEvent * event, GstPadProbeType type); +static gboolean activate_mode_internal (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active); + static guint gst_pad_signals[LAST_SIGNAL] = { 0 }; static GParamSpec *pspec_caps = NULL; @@ -299,7 +317,8 @@ gst_pad_link_get_name (GstPadLinkReturn ret) } #define gst_pad_parent_class parent_class -G_DEFINE_TYPE_WITH_CODE (GstPad, gst_pad, GST_TYPE_OBJECT, _do_init); +G_DEFINE_TYPE_WITH_CODE (GstPad, gst_pad, GST_TYPE_OBJECT, + G_ADD_PRIVATE (GstPad) _do_init); static void gst_pad_class_init (GstPadClass * klass) @@ -310,8 +329,6 @@ gst_pad_class_init (GstPadClass * klass) gobject_class = G_OBJECT_CLASS (klass); gstobject_class = GST_OBJECT_CLASS (klass); - g_type_class_add_private (klass, sizeof (GstPadPrivate)); - gobject_class->dispose = gst_pad_dispose; gobject_class->finalize = gst_pad_finalize; gobject_class->set_property = gst_pad_set_property; @@ -381,7 +398,7 @@ gst_pad_class_init (GstPadClass * klass) static void gst_pad_init (GstPad * pad) { - pad->priv = GST_PAD_GET_PRIVATE (pad); + pad->priv = gst_pad_get_instance_private (pad); GST_PAD_DIRECTION (pad) = GST_PAD_UNKNOWN; @@ -402,6 +419,8 @@ gst_pad_init (GstPad * pad) pad->priv->events = g_array_sized_new (FALSE, TRUE, sizeof (PadEvent), 16); pad->priv->events_cookie = 0; pad->priv->last_cookie = -1; + g_cond_init (&pad->priv->activation_cond); + pad->ABI.abi.last_flowret = GST_FLOW_FLUSHING; } @@ -463,6 +482,8 @@ find_event_by_type (GstPad * pad, GstEventType type, guint idx) if (idx == 0) goto found; idx--; + } else if (GST_EVENT_TYPE (ev->event) > type) { + break; } } ev = NULL; @@ -485,6 +506,8 @@ find_event (GstPad * pad, GstEvent * event) ev = &g_array_index (events, PadEvent, i); if (event == ev->event) goto found; + else if (GST_EVENT_TYPE (ev->event) > GST_EVENT_TYPE (event)) + break; } ev = NULL; found: @@ -508,7 +531,9 @@ remove_event_by_type (GstPad * pad, GstEventType type) if (ev->event == NULL) goto next; - if (GST_EVENT_TYPE (ev->event) != type) + if (GST_EVENT_TYPE (ev->event) > type) + break; + else if (GST_EVENT_TYPE (ev->event) != type) goto next; gst_event_unref (ev->event); @@ -576,7 +601,7 @@ restart: if (G_UNLIKELY (ev->event == NULL)) goto next; - /* take aditional ref, func might release the lock */ + /* take additional ref, func might release the lock */ ev_ret.event = gst_event_ref (ev->event); ev_ret.received = ev->received; @@ -619,12 +644,13 @@ restart: /* should be called with LOCK */ static GstEvent * -_apply_pad_offset (GstPad * pad, GstEvent * event, gboolean upstream) +_apply_pad_offset (GstPad * pad, GstEvent * event, gboolean upstream, + gint64 pad_offset) { gint64 offset; - GST_DEBUG_OBJECT (pad, "apply pad offset %" GST_TIME_FORMAT, - GST_TIME_ARGS (pad->offset)); + GST_DEBUG_OBJECT (pad, "apply pad offset %" GST_STIME_FORMAT, + GST_STIME_ARGS (pad_offset)); if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { GstSegment segment; @@ -635,16 +661,16 @@ _apply_pad_offset (GstPad * pad, GstEvent * event, gboolean upstream) gst_event_copy_segment (event, &segment); gst_event_unref (event); - gst_segment_offset_running_time (&segment, segment.format, pad->offset); + gst_segment_offset_running_time (&segment, segment.format, pad_offset); event = gst_event_new_segment (&segment); } event = gst_event_make_writable (event); offset = gst_event_get_running_time_offset (event); if (upstream) - offset -= pad->offset; + offset -= pad_offset; else - offset += pad->offset; + offset += pad_offset; gst_event_set_running_time_offset (event, offset); return event; @@ -654,11 +680,10 @@ static inline GstEvent * apply_pad_offset (GstPad * pad, GstEvent * event, gboolean upstream) { if (G_UNLIKELY (pad->offset != 0)) - return _apply_pad_offset (pad, event, upstream); + return _apply_pad_offset (pad, event, upstream, pad->offset); return event; } - /* should be called with the OBJECT_LOCK */ static GstCaps * get_pad_caps (GstPad * pad) @@ -679,7 +704,7 @@ gst_pad_dispose (GObject * object) GstPad *pad = GST_PAD_CAST (object); GstPad *peer; - GST_CAT_DEBUG_OBJECT (GST_CAT_REFCOUNTING, pad, "dispose"); + GST_CAT_DEBUG_OBJECT (GST_CAT_REFCOUNTING, pad, "%p dispose", pad); /* unlink the peer pad */ if ((peer = gst_pad_get_peer (pad))) { @@ -741,6 +766,7 @@ gst_pad_finalize (GObject * object) g_rec_mutex_clear (&pad->stream_rec_lock); g_cond_clear (&pad->block_cond); + g_cond_clear (&pad->priv->activation_cond); g_array_free (pad->priv->events, TRUE); G_OBJECT_CLASS (parent_class)->finalize (object); @@ -806,8 +832,7 @@ gst_pad_get_property (GObject * object, guint prop_id, * will be assigned. * This function makes a copy of the name so you can safely free the name. * - * Returns: (transfer floating) (nullable): a new #GstPad, or %NULL in - * case of an error. + * Returns: (transfer floating): a new #GstPad. * * MT safe. */ @@ -828,15 +853,18 @@ gst_pad_new (const gchar * name, GstPadDirection direction) * will be assigned. * This function makes a copy of the name so you can safely free the name. * - * Returns: (transfer floating) (nullable): a new #GstPad, or %NULL in - * case of an error. + * Returns: (transfer floating): a new #GstPad. */ GstPad * gst_pad_new_from_template (GstPadTemplate * templ, const gchar * name) { + GType pad_type = + GST_PAD_TEMPLATE_GTYPE (templ) == + G_TYPE_NONE ? GST_TYPE_PAD : GST_PAD_TEMPLATE_GTYPE (templ); + g_return_val_if_fail (GST_IS_PAD_TEMPLATE (templ), NULL); - return g_object_new (GST_TYPE_PAD, + return g_object_new (pad_type, "name", name, "direction", templ->direction, "template", templ, NULL); } @@ -850,8 +878,7 @@ gst_pad_new_from_template (GstPadTemplate * templ, const gchar * name) * will be assigned. * This function makes a copy of the name so you can safely free the name. * - * Returns: (transfer floating) (nullable): a new #GstPad, or %NULL in - * case of an error. + * Returns: (transfer floating): a new #GstPad. */ GstPad * gst_pad_new_from_static_template (GstStaticPadTemplate * templ, @@ -909,7 +936,9 @@ gst_pad_get_direction (GstPad * pad) static gboolean gst_pad_activate_default (GstPad * pad, GstObject * parent) { - return gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE); + g_return_val_if_fail (GST_IS_PAD (pad), FALSE); + + return activate_mode_internal (pad, parent, GST_PAD_MODE_PUSH, TRUE); } /** @@ -936,12 +965,22 @@ gst_pad_mode_get_name (GstPadMode mode) return "unknown"; } -static void +/* Returns TRUE if pad wasn't already in the new_mode */ +static gboolean pre_activate (GstPad * pad, GstPadMode new_mode) { switch (new_mode) { case GST_PAD_MODE_NONE: GST_OBJECT_LOCK (pad); + while (G_UNLIKELY (pad->priv->in_activation)) + g_cond_wait (&pad->priv->activation_cond, GST_OBJECT_GET_LOCK (pad)); + if (new_mode == GST_PAD_MODE (pad)) { + GST_WARNING_OBJECT (pad, + "Pad is already in the process of being deactivated"); + GST_OBJECT_UNLOCK (pad); + return FALSE; + } + pad->priv->in_activation = TRUE; GST_DEBUG_OBJECT (pad, "setting PAD_MODE NONE, set flushing"); GST_PAD_SET_FLUSHING (pad); pad->ABI.abi.last_flowret = GST_FLOW_FLUSHING; @@ -953,6 +992,15 @@ pre_activate (GstPad * pad, GstPadMode new_mode) case GST_PAD_MODE_PUSH: case GST_PAD_MODE_PULL: GST_OBJECT_LOCK (pad); + while (G_UNLIKELY (pad->priv->in_activation)) + g_cond_wait (&pad->priv->activation_cond, GST_OBJECT_GET_LOCK (pad)); + if (new_mode == GST_PAD_MODE (pad)) { + GST_WARNING_OBJECT (pad, + "Pad is already in the process of being activated"); + GST_OBJECT_UNLOCK (pad); + return FALSE; + } + pad->priv->in_activation = TRUE; GST_DEBUG_OBJECT (pad, "setting pad into %s mode, unset flushing", gst_pad_mode_get_name (new_mode)); GST_PAD_UNSET_FLUSHING (pad); @@ -981,6 +1029,7 @@ pre_activate (GstPad * pad, GstPadMode new_mode) } break; } + return TRUE; } static void @@ -988,6 +1037,11 @@ post_activate (GstPad * pad, GstPadMode new_mode) { switch (new_mode) { case GST_PAD_MODE_NONE: + GST_OBJECT_LOCK (pad); + pad->priv->in_activation = FALSE; + g_cond_broadcast (&pad->priv->activation_cond); + GST_OBJECT_UNLOCK (pad); + /* ensures that streaming stops */ GST_PAD_STREAM_LOCK (pad); GST_DEBUG_OBJECT (pad, "stopped streaming"); @@ -998,6 +1052,10 @@ post_activate (GstPad * pad, GstPadMode new_mode) break; case GST_PAD_MODE_PUSH: case GST_PAD_MODE_PULL: + GST_OBJECT_LOCK (pad); + pad->priv->in_activation = FALSE; + g_cond_broadcast (&pad->priv->activation_cond); + GST_OBJECT_UNLOCK (pad); /* NOP */ break; } @@ -1054,7 +1112,7 @@ gst_pad_set_active (GstPad * pad, gboolean active) } else { GST_DEBUG_OBJECT (pad, "deactivating pad from %s mode", gst_pad_mode_get_name (old)); - ret = gst_pad_activate_mode (pad, old, FALSE); + ret = activate_mode_internal (pad, parent, old, FALSE); if (ret) pad->ABI.abi.last_flowret = GST_FLOW_FLUSHING; } @@ -1088,36 +1146,18 @@ failed: } } -/** - * gst_pad_activate_mode: - * @pad: the #GstPad to activate or deactivate. - * @mode: the requested activation mode - * @active: whether or not the pad should be active. - * - * Activates or deactivates the given pad in @mode via dispatching to the - * pad's activatemodefunc. For use from within pad activation functions only. - * - * If you don't know what this is, you probably don't want to call it. - * - * Returns: %TRUE if the operation was successful. - * - * MT safe. - */ -gboolean -gst_pad_activate_mode (GstPad * pad, GstPadMode mode, gboolean active) +static gboolean +activate_mode_internal (GstPad * pad, GstObject * parent, GstPadMode mode, + gboolean active) { gboolean res = FALSE; - GstObject *parent; GstPadMode old, new; GstPadDirection dir; GstPad *peer; - g_return_val_if_fail (GST_IS_PAD (pad), FALSE); - GST_OBJECT_LOCK (pad); old = GST_PAD_MODE (pad); dir = GST_PAD_DIRECTION (pad); - ACQUIRE_PARENT (pad, parent, no_parent); GST_OBJECT_UNLOCK (pad); new = active ? mode : GST_PAD_MODE_NONE; @@ -1131,8 +1171,9 @@ gst_pad_activate_mode (GstPad * pad, GstPadMode mode, gboolean active) GST_DEBUG_OBJECT (pad, "deactivating pad from %s mode", gst_pad_mode_get_name (old)); - if (G_UNLIKELY (!gst_pad_activate_mode (pad, old, FALSE))) + if (G_UNLIKELY (!activate_mode_internal (pad, parent, old, FALSE))) goto deactivate_failed; + old = GST_PAD_MODE_NONE; } switch (mode) { @@ -1167,17 +1208,21 @@ gst_pad_activate_mode (GstPad * pad, GstPadMode mode, gboolean active) /* Mark pad as needing reconfiguration */ if (active) GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_NEED_RECONFIGURE); - pre_activate (pad, new); - if (GST_PAD_ACTIVATEMODEFUNC (pad)) { - if (G_UNLIKELY (!GST_PAD_ACTIVATEMODEFUNC (pad) (pad, parent, mode, - active))) - goto failure; - } else { - /* can happen for sinks of passthrough elements */ - } + /* pre_activate returns TRUE if we weren't already in the process of + * switching to the 'new' mode */ + if (pre_activate (pad, new)) { - post_activate (pad, new); + if (GST_PAD_ACTIVATEMODEFUNC (pad)) { + if (G_UNLIKELY (!GST_PAD_ACTIVATEMODEFUNC (pad) (pad, parent, mode, + active))) + goto failure; + } else { + /* can happen for sinks of passthrough elements */ + } + + post_activate (pad, new); + } GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, "%s in %s mode", active ? "activated" : "deactivated", gst_pad_mode_get_name (mode)); @@ -1194,16 +1239,8 @@ exit_success: } exit: - RELEASE_PARENT (parent); - return res; -no_parent: - { - GST_DEBUG_OBJECT (pad, "no parent"); - GST_OBJECT_UNLOCK (pad); - return FALSE; - } was_ok: { GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, "already %s in %s mode", @@ -1240,12 +1277,70 @@ failure: active ? "activate" : "deactivate", gst_pad_mode_get_name (mode)); GST_PAD_SET_FLUSHING (pad); GST_PAD_MODE (pad) = old; + pad->priv->in_activation = FALSE; + g_cond_broadcast (&pad->priv->activation_cond); GST_OBJECT_UNLOCK (pad); goto exit; } } /** + * gst_pad_activate_mode: + * @pad: the #GstPad to activate or deactivate. + * @mode: the requested activation mode + * @active: whether or not the pad should be active. + * + * Activates or deactivates the given pad in @mode via dispatching to the + * pad's activatemodefunc. For use from within pad activation functions only. + * + * If you don't know what this is, you probably don't want to call it. + * + * Returns: %TRUE if the operation was successful. + * + * MT safe. + */ +gboolean +gst_pad_activate_mode (GstPad * pad, GstPadMode mode, gboolean active) +{ + GstObject *parent; + gboolean res; + GstPadMode old, new; + + g_return_val_if_fail (GST_IS_PAD (pad), FALSE); + + GST_OBJECT_LOCK (pad); + + old = GST_PAD_MODE (pad); + new = active ? mode : GST_PAD_MODE_NONE; + if (old == new) + goto was_ok; + + ACQUIRE_PARENT (pad, parent, no_parent); + + GST_OBJECT_UNLOCK (pad); + + res = activate_mode_internal (pad, parent, mode, active); + + RELEASE_PARENT (parent); + + return res; + +was_ok: + { + GST_OBJECT_UNLOCK (pad); + GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, "already %s in %s mode", + active ? "activated" : "deactivated", gst_pad_mode_get_name (mode)); + return TRUE; + } +no_parent: + { + GST_WARNING_OBJECT (pad, "no parent"); + GST_OBJECT_UNLOCK (pad); + return FALSE; + } +} + +/** * gst_pad_is_active: * @pad: the #GstPad to query * @@ -1274,6 +1369,9 @@ cleanup_hook (GstPad * pad, GHook * hook) { GstPadProbeType type; + GST_DEBUG_OBJECT (pad, + "cleaning up hook %lu with flags %08x", hook->hook_id, hook->flags); + if (!G_HOOK_IS_VALID (hook)) return; @@ -1309,6 +1407,13 @@ cleanup_hook (GstPad * pad, GHook * hook) * Be notified of different states of pads. The provided callback is called for * every state that matches @mask. * + * Probes are called in groups: First GST_PAD_PROBE_TYPE_BLOCK probes are + * called, then others, then finally GST_PAD_PROBE_TYPE_IDLE. The only + * exception here are GST_PAD_PROBE_TYPE_IDLE probes that are called + * immediately if the pad is already idle while calling gst_pad_add_probe(). + * In each of the groups, probes are called in the order in which they were + * added. + * * Returns: an id or 0 if no probe is pending. The id can be used to remove the * probe with gst_pad_remove_probe(). When using GST_PAD_PROBE_TYPE_IDLE it can * happen that the probe can be run immediately and if the probe returns @@ -1335,9 +1440,9 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask, GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "adding probe for mask 0x%08x", mask); - /* when no contraints are given for the types, assume all types are + /* when no constraints are given for the types, assume all types are * acceptable */ - if ((mask & GST_PAD_PROBE_TYPE_ALL_BOTH) == 0) + if ((mask & _PAD_PROBE_TYPE_ALL_BOTH_AND_FLUSH) == 0) mask |= GST_PAD_PROBE_TYPE_ALL_BOTH; if ((mask & GST_PAD_PROBE_TYPE_SCHEDULING) == 0) mask |= GST_PAD_PROBE_TYPE_SCHEDULING; @@ -1347,12 +1452,11 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask, hook->func = callback; hook->data = user_data; hook->destroy = destroy_data; - PROBE_COOKIE (hook) = (pad->priv->probe_cookie - 1); /* add the probe */ - g_hook_prepend (&pad->probes, hook); + g_hook_append (&pad->probes, hook); pad->num_probes++; - /* incremenent cookie so that the new hook get's called */ + /* incremenent cookie so that the new hook gets called */ pad->priv->probe_list_cookie++; /* get the id of the hook, we return this and it can be used to remove the @@ -1387,6 +1491,7 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask, /* Keep another ref, the callback could destroy the pad */ gst_object_ref (pad); + pad->priv->idle_running++; /* the pad is idle now, we can signal the idle callback now */ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, @@ -1412,10 +1517,17 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask, case GST_PAD_PROBE_OK: GST_DEBUG_OBJECT (pad, "probe returned OK"); break; + case GST_PAD_PROBE_HANDLED: + GST_DEBUG_OBJECT (pad, "probe handled the data"); + break; default: GST_DEBUG_OBJECT (pad, "probe returned %d", ret); break; } + pad->priv->idle_running--; + if (pad->priv->idle_running == 0) { + GST_PAD_BLOCK_BROADCAST (pad); + } GST_OBJECT_UNLOCK (pad); gst_object_unref (pad); @@ -1796,6 +1908,53 @@ gst_pad_set_event_function_full (GstPad * pad, GstPadEventFunction event, GST_DEBUG_FUNCPTR_NAME (event)); } +static gboolean +event_wrap (GstPad * pad, GstObject * object, GstEvent * event) +{ + GstFlowReturn ret; + + ret = GST_PAD_EVENTFULLFUNC (pad) (pad, object, event); + if (ret == GST_FLOW_OK) + return TRUE; + return FALSE; +} + +/** + * gst_pad_set_event_full_function: + * @p: a #GstPad of either direction. + * @f: the #GstPadEventFullFunction to set. + * + * Calls gst_pad_set_event_full_function_full() with %NULL for the user_data and + * notify. + */ +/** + * gst_pad_set_event_full_function_full: + * @pad: a #GstPad of either direction. + * @event: the #GstPadEventFullFunction to set. + * @user_data: user_data passed to @notify + * @notify: notify called when @event will not be used anymore. + * + * Sets the given event handler for the pad. + * + * Since: 1.8 + */ +void +gst_pad_set_event_full_function_full (GstPad * pad, + GstPadEventFullFunction event, gpointer user_data, GDestroyNotify notify) +{ + g_return_if_fail (GST_IS_PAD (pad)); + + if (pad->eventnotify) + pad->eventnotify (pad->eventdata); + GST_PAD_EVENTFULLFUNC (pad) = event; + GST_PAD_EVENTFUNC (pad) = event_wrap; + pad->eventdata = user_data; + pad->eventnotify = notify; + + GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, "eventfullfunc for set to %s", + GST_DEBUG_FUNCPTR_NAME (event)); +} + /** * gst_pad_set_query_function: * @p: a #GstPad of either direction. @@ -1923,6 +2082,10 @@ gst_pad_set_link_function_full (GstPad * pad, GstPadLinkFunction link, * * Sets the given unlink function for the pad. It will be called * when the pad is unlinked. + * + * Note that the pad's lock is already held when the unlink + * function is called, so most pad functions cannot be called + * from within the callback. */ void gst_pad_set_unlink_function_full (GstPad * pad, GstPadUnlinkFunction unlink, @@ -1964,6 +2127,8 @@ gst_pad_unlink (GstPad * srcpad, GstPad * sinkpad) g_return_val_if_fail (GST_IS_PAD (sinkpad), FALSE); g_return_val_if_fail (GST_PAD_IS_SINK (sinkpad), FALSE); + GST_TRACER_PAD_UNLINK_PRE (srcpad, sinkpad); + GST_CAT_INFO (GST_CAT_ELEMENT_PADS, "unlinking %s:%s(%p) and %s:%s(%p)", GST_DEBUG_PAD_NAME (srcpad), srcpad, GST_DEBUG_PAD_NAME (sinkpad), sinkpad); @@ -2031,6 +2196,7 @@ done: GST_STRUCTURE_CHANGE_TYPE_PAD_UNLINK, parent, FALSE)); gst_object_unref (parent); } + GST_TRACER_PAD_UNLINK_POST (srcpad, sinkpad, result); return result; /* ERRORS */ @@ -2340,6 +2506,8 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags) g_return_val_if_fail (GST_PAD_IS_SINK (sinkpad), GST_PAD_LINK_WRONG_DIRECTION); + GST_TRACER_PAD_LINK_PRE (srcpad, sinkpad); + /* Notify the parent early. See gst_pad_unlink for details. */ if (G_LIKELY ((parent = GST_ELEMENT_CAST (gst_pad_get_parent (srcpad))))) { if (G_LIKELY (GST_IS_ELEMENT (parent))) { @@ -2355,8 +2523,12 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags) /* prepare will also lock the two pads */ result = gst_pad_link_prepare (srcpad, sinkpad, flags); - if (G_UNLIKELY (result != GST_PAD_LINK_OK)) + if (G_UNLIKELY (result != GST_PAD_LINK_OK)) { + GST_CAT_INFO (GST_CAT_PADS, "link between %s:%s and %s:%s failed: %s", + GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad), + gst_pad_link_get_name (result)); goto done; + } /* must set peers before calling the link function */ GST_PAD_PEER (srcpad) = sinkpad; @@ -2413,7 +2585,8 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags) GST_CAT_INFO (GST_CAT_PADS, "linked %s:%s and %s:%s, successful", GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad)); - gst_pad_send_event (srcpad, gst_event_new_reconfigure ()); + if (!(flags & GST_PAD_LINK_CHECK_NO_RECONFIGURE)) + gst_pad_send_event (srcpad, gst_event_new_reconfigure ()); done: if (G_LIKELY (parent)) { @@ -2423,6 +2596,7 @@ done: gst_object_unref (parent); } + GST_TRACER_PAD_LINK_POST (srcpad, sinkpad, result); return result; /* ERRORS */ @@ -2542,7 +2716,8 @@ gst_pad_has_current_caps (GstPad * pad) * Gets the capabilities currently configured on @pad with the last * #GST_EVENT_CAPS event. * - * Returns: the current caps of the pad with incremented ref-count. + * Returns: (transfer full) (nullable): the current caps of the pad with + * incremented ref-count or %NULL when pad has no caps. Unref after usage. */ GstCaps * gst_pad_get_current_caps (GstPad * pad) @@ -2588,7 +2763,7 @@ gst_pad_get_pad_template_caps (GstPad * pad) * Gets the peer of @pad. This function refs the peer pad so * you need to unref it after use. * - * Returns: (transfer full): the peer #GstPad. Unref after usage. + * Returns: (transfer full) (nullable): the peer #GstPad. Unref after usage. * * MT safe. */ @@ -2629,30 +2804,41 @@ GstCaps * gst_pad_get_allowed_caps (GstPad * pad) { GstCaps *mycaps; - GstCaps *caps; - GstPad *peer; + GstCaps *caps = NULL; + GstQuery *query; g_return_val_if_fail (GST_IS_PAD (pad), NULL); GST_OBJECT_LOCK (pad); - peer = GST_PAD_PEER (pad); - if (G_UNLIKELY (peer == NULL)) + if (G_UNLIKELY (GST_PAD_PEER (pad) == NULL)) goto no_peer; + GST_OBJECT_UNLOCK (pad); GST_CAT_DEBUG_OBJECT (GST_CAT_PROPERTIES, pad, "getting allowed caps"); - gst_object_ref (peer); - GST_OBJECT_UNLOCK (pad); mycaps = gst_pad_query_caps (pad, NULL); - caps = gst_pad_query_caps (peer, mycaps); - gst_object_unref (peer); + /* Query peer caps */ + query = gst_query_new_caps (mycaps); + if (!gst_pad_peer_query (pad, query)) { + GST_CAT_DEBUG_OBJECT (GST_CAT_CAPS, pad, "Caps query failed"); + goto end; + } - gst_caps_unref (mycaps); + gst_query_parse_caps_result (query, &caps); + if (caps == NULL) { + g_warn_if_fail (caps != NULL); + goto end; + } + gst_caps_ref (caps); GST_CAT_DEBUG_OBJECT (GST_CAT_CAPS, pad, "allowed caps %" GST_PTR_FORMAT, caps); +end: + gst_query_unref (query); + gst_caps_unref (mycaps); + return caps; no_peer: @@ -2770,7 +2956,7 @@ no_parent: { GST_DEBUG_OBJECT (pad, "no parent"); GST_OBJECT_UNLOCK (pad); - return FALSE; + return NULL; } } @@ -2884,7 +3070,7 @@ event_forward_func (GstPad * pad, EventData * data) * The EOS event will pause the task associated with @pad before it is forwarded * to all internally linked pads, * - * The the event is sent to all pads internally linked to @pad. This function + * The event is sent to all pads internally linked to @pad. This function * takes ownership of @event. * * Returns: %TRUE if the event was sent successfully. @@ -2937,7 +3123,7 @@ static gboolean gst_pad_query_accept_caps_default (GstPad * pad, GstQuery * query) { /* get the caps and see if it intersects to something not empty */ - GstCaps *caps, *allowed; + GstCaps *caps, *allowed = NULL; gboolean result; GST_DEBUG_OBJECT (pad, "query accept-caps %" GST_PTR_FORMAT, query); @@ -2945,16 +3131,23 @@ gst_pad_query_accept_caps_default (GstPad * pad, GstQuery * query) /* first forward the query to internally linked pads when we are dealing with * a PROXY CAPS */ if (GST_PAD_IS_PROXY_CAPS (pad)) { - if ((result = gst_pad_proxy_query_accept_caps (pad, query))) { + result = gst_pad_proxy_query_accept_caps (pad, query); + if (result) + allowed = gst_pad_get_pad_template_caps (pad); + else goto done; - } } - GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, pad, - "fallback ACCEPT_CAPS query, consider implementing a specialized version"); - gst_query_parse_accept_caps (query, &caps); - allowed = gst_pad_query_caps (pad, caps); + if (!allowed) { + if (GST_PAD_IS_ACCEPT_TEMPLATE (pad)) { + allowed = gst_pad_get_pad_template_caps (pad); + } else { + GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, pad, + "fallback ACCEPT_CAPS query, consider implementing a specialized version"); + allowed = gst_pad_query_caps (pad, caps); + } + } if (allowed) { if (GST_PAD_IS_ACCEPT_INTERSECT (pad)) { @@ -3050,6 +3243,123 @@ done: return TRUE; } +/* Default latency implementation */ +typedef struct +{ + gboolean live; + GstClockTime min, max; +} LatencyFoldData; + +static gboolean +query_latency_default_fold (const GValue * item, GValue * ret, + gpointer user_data) +{ + GstPad *pad = g_value_get_object (item), *peer; + LatencyFoldData *fold_data = user_data; + GstQuery *query; + gboolean res = FALSE; + + query = gst_query_new_latency (); + + peer = gst_pad_get_peer (pad); + if (peer) { + res = gst_pad_peer_query (pad, query); + } else { + GST_LOG_OBJECT (pad, "No peer pad found, ignoring this pad"); + } + + if (res) { + gboolean live; + GstClockTime min, max; + + gst_query_parse_latency (query, &live, &min, &max); + + GST_LOG_OBJECT (pad, "got latency live:%s min:%" G_GINT64_FORMAT + " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max); + + if (live) { + if (min > fold_data->min) + fold_data->min = min; + + if (fold_data->max == GST_CLOCK_TIME_NONE) + fold_data->max = max; + else if (max < fold_data->max) + fold_data->max = max; + + fold_data->live = TRUE; + } + } else if (peer) { + GST_DEBUG_OBJECT (pad, "latency query failed"); + g_value_set_boolean (ret, FALSE); + } + + gst_query_unref (query); + if (peer) + gst_object_unref (peer); + + return TRUE; +} + +static gboolean +gst_pad_query_latency_default (GstPad * pad, GstQuery * query) +{ + GstIterator *it; + GstIteratorResult res; + GValue ret = G_VALUE_INIT; + gboolean query_ret; + LatencyFoldData fold_data; + + it = gst_pad_iterate_internal_links (pad); + if (!it) { + GST_DEBUG_OBJECT (pad, "Can't iterate internal links"); + return FALSE; + } + + g_value_init (&ret, G_TYPE_BOOLEAN); + +retry: + fold_data.live = FALSE; + fold_data.min = 0; + fold_data.max = GST_CLOCK_TIME_NONE; + + g_value_set_boolean (&ret, TRUE); + res = gst_iterator_fold (it, query_latency_default_fold, &ret, &fold_data); + switch (res) { + case GST_ITERATOR_OK: + g_assert_not_reached (); + break; + case GST_ITERATOR_DONE: + break; + case GST_ITERATOR_ERROR: + g_value_set_boolean (&ret, FALSE); + break; + case GST_ITERATOR_RESYNC: + gst_iterator_resync (it); + goto retry; + default: + g_assert_not_reached (); + break; + } + gst_iterator_free (it); + + query_ret = g_value_get_boolean (&ret); + if (query_ret) { + GST_LOG_OBJECT (pad, "got latency live:%s min:%" G_GINT64_FORMAT + " max:%" G_GINT64_FORMAT, fold_data.live ? "true" : "false", + fold_data.min, fold_data.max); + + if (fold_data.min > fold_data.max) { + GST_ERROR_OBJECT (pad, "minimum latency bigger than maximum latency"); + } + + gst_query_set_latency (query, fold_data.live, fold_data.min, fold_data.max); + } else { + GST_LOG_OBJECT (pad, "latency query failed"); + } + + return query_ret; +} + typedef struct { GstQuery *query; @@ -3105,10 +3415,17 @@ gst_pad_query_default (GstPad * pad, GstObject * parent, GstQuery * query) ret = gst_pad_query_caps_default (pad, query); forward = FALSE; break; + case GST_QUERY_LATENCY: + ret = gst_pad_query_latency_default (pad, query); + forward = FALSE; + break; + case GST_QUERY_BITRATE: + /* FIXME: better default handling */ + forward = TRUE; + break; case GST_QUERY_POSITION: case GST_QUERY_SEEKING: case GST_QUERY_FORMATS: - case GST_QUERY_LATENCY: case GST_QUERY_JITTER: case GST_QUERY_RATE: case GST_QUERY_CONVERT: @@ -3142,6 +3459,8 @@ gst_pad_query_default (GstPad * pad, GstObject * parent, GstQuery * query) return ret; } +#define N_STACK_ALLOCATE_PROBES (16) + static void probe_hook_marshal (GHook * hook, ProbeMarshall * data) { @@ -3150,27 +3469,71 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data) GstPadProbeType type, flags; GstPadProbeCallback callback; GstPadProbeReturn ret; - - /* if we have called this callback, do nothing */ - if (PROBE_COOKIE (hook) == data->cookie) { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "hook %lu, cookie %u already called", hook->hook_id, - PROBE_COOKIE (hook)); - return; + gpointer original_data; + guint i; + + /* if we have called this callback, do nothing. But only check + * if we're actually calling probes a second time */ + if (data->retry) { + for (i = 0; i < data->n_called_probes; i++) { + if (data->called_probes[i] == hook->hook_id) { + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "hook %lu already called", hook->hook_id); + return; + } + } } - PROBE_COOKIE (hook) = data->cookie; + /* reallocate on the heap if we had more than 16 probes */ + if (data->n_called_probes == data->called_probes_size) { + if (data->called_probes_size > N_STACK_ALLOCATE_PROBES) { + data->called_probes_size *= 2; + data->called_probes = + g_renew (gulong, data->called_probes, data->called_probes_size); + } else { + gulong *tmp = data->called_probes; + + data->called_probes_size *= 2; + data->called_probes = g_new (gulong, data->called_probes_size); + memcpy (data->called_probes, tmp, + N_STACK_ALLOCATE_PROBES * sizeof (gulong)); + } + } + data->called_probes[data->n_called_probes++] = hook->hook_id; flags = hook->flags >> G_HOOK_FLAG_USER_SHIFT; type = info->type; + original_data = info->data; - /* one of the data types for non-idle probes */ - if ((type & GST_PAD_PROBE_TYPE_IDLE) == 0 - && (flags & GST_PAD_PROBE_TYPE_ALL_BOTH & type) == 0) - goto no_match; /* one of the scheduling types */ if ((flags & GST_PAD_PROBE_TYPE_SCHEDULING & type) == 0) goto no_match; + + if (G_UNLIKELY (data->handled)) { + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "probe previously returned HANDLED, not calling again"); + goto no_match; + } else if (G_UNLIKELY (data->dropped)) { + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "probe previously returned DROPPED, not calling again"); + goto no_match; + } + + if (type & GST_PAD_PROBE_TYPE_PUSH) { + /* one of the data types for non-idle probes */ + if ((type & GST_PAD_PROBE_TYPE_IDLE) == 0 + && (flags & _PAD_PROBE_TYPE_ALL_BOTH_AND_FLUSH & type) == 0) + goto no_match; + } else if (type & GST_PAD_PROBE_TYPE_PULL) { + /* one of the data types for non-idle probes */ + if ((type & GST_PAD_PROBE_TYPE_BLOCKING) == 0 + && (flags & _PAD_PROBE_TYPE_ALL_BOTH_AND_FLUSH & type) == 0) + goto no_match; + } else { + /* Type must have PULL or PUSH probe types */ + g_assert_not_reached (); + } + /* one of the blocking types must match */ if ((type & GST_PAD_PROBE_TYPE_BLOCKING) && (flags & GST_PAD_PROBE_TYPE_BLOCKING & type) == 0) @@ -3184,8 +3547,7 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data) goto no_match; GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "hook %lu, cookie %u with flags 0x%08x matches", hook->hook_id, - PROBE_COOKIE (hook), flags); + "hook %lu with flags 0x%08x matches", hook->hook_id, flags); data->marshalled = TRUE; @@ -3195,12 +3557,24 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data) info->id = hook->hook_id; + if ((flags & GST_PAD_PROBE_TYPE_IDLE)) + pad->priv->idle_running++; + GST_OBJECT_UNLOCK (pad); ret = callback (pad, info, hook->data); GST_OBJECT_LOCK (pad); + if ((flags & GST_PAD_PROBE_TYPE_IDLE)) + pad->priv->idle_running--; + + if (original_data != NULL && info->data == NULL) { + GST_DEBUG_OBJECT (pad, "data item in pad probe info was dropped"); + info->type = GST_PAD_PROBE_TYPE_INVALID; + data->dropped = TRUE; + } + switch (ret) { case GST_PAD_PROBE_REMOVE: /* remove the probe */ @@ -3214,6 +3588,10 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data) info->type = GST_PAD_PROBE_TYPE_INVALID; data->dropped = TRUE; break; + case GST_PAD_PROBE_HANDLED: + GST_DEBUG_OBJECT (pad, "probe handled data"); + data->handled = TRUE; + break; case GST_PAD_PROBE_PASS: /* inform the pad block to let things pass */ GST_DEBUG_OBJECT (pad, "asked to pass item"); @@ -3231,8 +3609,8 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data) no_match: { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "hook %lu, cookie %u with flags 0x%08x does not match %08x", - hook->hook_id, PROBE_COOKIE (hook), flags, info->type); + "hook %lu with flags 0x%08x does not match %08x", + hook->hook_id, flags, info->type); return; } } @@ -3243,31 +3621,72 @@ no_match: if (G_UNLIKELY (pad->num_probes)) { \ GstFlowReturn pval = defaultval; \ /* pass NULL as the data item */ \ - GstPadProbeInfo info = { mask, 0, NULL, 0, 0 }; \ + GstPadProbeInfo info = { mask, 0, NULL, 0, 0 }; \ + info.ABI.abi.flow_ret = defaultval; \ ret = do_probe_callbacks (pad, &info, defaultval); \ if (G_UNLIKELY (ret != pval && ret != GST_FLOW_OK)) \ goto label; \ } \ } G_STMT_END -#define PROBE_FULL(pad,mask,data,offs,size,label) \ - G_STMT_START { \ - if (G_UNLIKELY (pad->num_probes)) { \ - /* pass the data item */ \ - GstPadProbeInfo info = { mask, 0, data, offs, size }; \ - ret = do_probe_callbacks (pad, &info, GST_FLOW_OK); \ - /* store the possibly updated data item */ \ - data = GST_PAD_PROBE_INFO_DATA (&info); \ - /* if something went wrong, exit */ \ - if (G_UNLIKELY (ret != GST_FLOW_OK)) \ - goto label; \ - } \ +#define PROBE_FULL(pad,mask,data,offs,size,label,handleable,handle_label) \ + G_STMT_START { \ + if (G_UNLIKELY (pad->num_probes)) { \ + /* pass the data item */ \ + GstPadProbeInfo info = { mask, 0, data, offs, size }; \ + info.ABI.abi.flow_ret = GST_FLOW_OK; \ + ret = do_probe_callbacks (pad, &info, GST_FLOW_OK); \ + /* store the possibly updated data item */ \ + data = GST_PAD_PROBE_INFO_DATA (&info); \ + /* if something went wrong, exit */ \ + if (G_UNLIKELY (ret != GST_FLOW_OK)) { \ + if (handleable && ret == GST_FLOW_CUSTOM_SUCCESS_1) { \ + ret = info.ABI.abi.flow_ret; \ + goto handle_label; \ + } \ + goto label; \ + } \ + } \ } G_STMT_END -#define PROBE_PUSH(pad,mask,data,label) \ - PROBE_FULL(pad, mask, data, -1, -1, label); -#define PROBE_PULL(pad,mask,data,offs,size,label) \ - PROBE_FULL(pad, mask, data, offs, size, label); +#define PROBE_PUSH(pad,mask,data,label) \ + PROBE_FULL(pad, mask, data, -1, -1, label, FALSE, label); +#define PROBE_HANDLE(pad,mask,data,label,handle_label) \ + PROBE_FULL(pad, mask, data, -1, -1, label, TRUE, handle_label); +#define PROBE_PULL(pad,mask,data,offs,size,label) \ + PROBE_FULL(pad, mask, data, offs, size, label, FALSE, label); + +static GstFlowReturn +do_pad_idle_probe_wait (GstPad * pad) +{ + while (GST_PAD_IS_RUNNING_IDLE_PROBE (pad)) { + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "waiting idle probe to be removed"); + GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_BLOCKING); + GST_PAD_BLOCK_WAIT (pad); + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_BLOCKING); + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "We got unblocked"); + + if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) + return GST_FLOW_FLUSHING; + } + return GST_FLOW_OK; +} + +#define PROBE_TYPE_IS_SERIALIZED(i) \ + ( \ + ( \ + (((i)->type & (GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | \ + GST_PAD_PROBE_TYPE_EVENT_FLUSH)) && \ + GST_EVENT_IS_SERIALIZED ((i)->data)) \ + ) || ( \ + (((i)->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) && \ + GST_QUERY_IS_SERIALIZED ((i)->data)) \ + ) || ( \ + ((i)->type & (GST_PAD_PROBE_TYPE_BUFFER | \ + GST_PAD_PROBE_TYPE_BUFFER_LIST)) \ + ) \ + ) static GstFlowReturn do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info, @@ -3276,30 +3695,44 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info, ProbeMarshall data; guint cookie; gboolean is_block; + gulong called_probes[N_STACK_ALLOCATE_PROBES]; data.pad = pad; data.info = info; data.pass = FALSE; + data.handled = FALSE; data.marshalled = FALSE; data.dropped = FALSE; - data.cookie = ++pad->priv->probe_cookie; + + /* We stack-allocate for N_STACK_ALLOCATE_PROBES hooks as a first step. If more are needed, + * we will re-allocate with g_malloc(). This should usually never be needed + */ + data.called_probes = called_probes; + data.n_called_probes = 0; + data.called_probes_size = N_STACK_ALLOCATE_PROBES; + data.retry = FALSE; is_block = (info->type & GST_PAD_PROBE_TYPE_BLOCK) == GST_PAD_PROBE_TYPE_BLOCK; + if (is_block && PROBE_TYPE_IS_SERIALIZED (info)) { + if (do_pad_idle_probe_wait (pad) == GST_FLOW_FLUSHING) + goto flushing; + } + again: - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "do probes cookie %u", data.cookie); + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "do probes"); cookie = pad->priv->probe_list_cookie; g_hook_list_marshal (&pad->probes, TRUE, (GHookMarshaller) probe_hook_marshal, &data); - /* if the list changed, call the new callbacks (they will not have their - * cookie set to data.cookie */ + /* if the list changed, call the new callbacks (they will not be in + * called_probes yet) */ if (cookie != pad->priv->probe_list_cookie) { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "probe list changed, restarting"); + data.retry = TRUE; goto again; } @@ -3307,6 +3740,11 @@ again: if (data.dropped) goto dropped; + /* If one handler took care of it, let the the item pass */ + if (data.handled) { + goto handled; + } + /* if no handler matched and we are blocking, let the item pass */ if (!data.marshalled && is_block) goto passed; @@ -3336,11 +3774,12 @@ again: GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_BLOCKING); GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "We got unblocked"); - /* if the list changed, call the new callbacks (they will not have their - * cookie set to data.cookie */ + /* if the list changed, call the new callbacks (they will not be in + * called_probes yet) */ if (cookie != pad->priv->probe_list_cookie) { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "probe list changed, restarting"); + data.retry = TRUE; goto again; } @@ -3349,25 +3788,41 @@ again: } } + if (data.called_probes_size > N_STACK_ALLOCATE_PROBES) + g_free (data.called_probes); + return defaultval; /* ERRORS */ flushing: { GST_DEBUG_OBJECT (pad, "pad is flushing"); + if (data.called_probes_size > N_STACK_ALLOCATE_PROBES) + g_free (data.called_probes); return GST_FLOW_FLUSHING; } dropped: { GST_DEBUG_OBJECT (pad, "data is dropped"); + if (data.called_probes_size > N_STACK_ALLOCATE_PROBES) + g_free (data.called_probes); return GST_FLOW_CUSTOM_SUCCESS; } passed: { /* FIXME : Should we return FLOW_OK or the defaultval ?? */ GST_DEBUG_OBJECT (pad, "data is passed"); + if (data.called_probes_size > N_STACK_ALLOCATE_PROBES) + g_free (data.called_probes); return GST_FLOW_OK; } +handled: + { + GST_DEBUG_OBJECT (pad, "data was handled"); + if (data.called_probes_size > N_STACK_ALLOCATE_PROBES) + g_free (data.called_probes); + return GST_FLOW_CUSTOM_SUCCESS_1; + } } /* pad offsets */ @@ -3420,7 +3875,8 @@ gst_pad_set_offset (GstPad * pad, gint64 offset) goto done; pad->offset = offset; - GST_DEBUG_OBJECT (pad, "changed offset to %" G_GINT64_FORMAT, offset); + GST_DEBUG_OBJECT (pad, "changed offset to %" GST_STIME_FORMAT, + GST_STIME_ARGS (offset)); /* resend all sticky events with updated offset on next buffer push */ events_foreach (pad, mark_event_not_received, NULL); @@ -3469,6 +3925,8 @@ push_sticky (GstPad * pad, PadEvent * ev, gpointer user_data) } else { data->ret = gst_pad_push_event_unchecked (pad, gst_event_ref (event), GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM); + if (data->ret == GST_FLOW_CUSTOM_SUCCESS_1) + data->ret = GST_FLOW_OK; } switch (data->ret) { @@ -3493,11 +3951,12 @@ push_sticky (GstPad * pad, PadEvent * ev, gpointer user_data) break; case GST_FLOW_NOT_LINKED: /* not linked is not a problem, we are sticky so the event will be - * sent later but only for non-EOS events */ + * rescheduled to be sent later on re-link, but only for non-EOS events */ GST_DEBUG_OBJECT (pad, "pad was not linked, mark pending"); - if (GST_EVENT_TYPE (event) != GST_EVENT_EOS) + if (GST_EVENT_TYPE (event) != GST_EVENT_EOS) { data->ret = GST_FLOW_OK; - GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS); + ev->received = TRUE; + } break; default: GST_DEBUG_OBJECT (pad, "result %s, mark pending events", @@ -3592,6 +4051,7 @@ gst_pad_query (GstPad * pad, GstQuery * query) GST_DEBUG_OBJECT (pad, "doing query %p (%s)", query, GST_QUERY_TYPE_NAME (query)); + GST_TRACER_PAD_QUERY_PRE (pad, query); serialized = GST_QUERY_IS_SERIALIZED (query); if (G_UNLIKELY (serialized)) @@ -3614,6 +4074,7 @@ gst_pad_query (GstPad * pad, GstQuery * query) GST_DEBUG_OBJECT (pad, "sent query %p (%s), result %d", query, GST_QUERY_TYPE_NAME (query), res); + GST_TRACER_PAD_QUERY_POST (pad, query, res); if (res != TRUE) goto query_failed; @@ -3669,9 +4130,12 @@ probe_stopped: if (G_UNLIKELY (serialized)) GST_PAD_STREAM_UNLOCK (pad); - /* if a probe dropped, we don't sent it further but assume that the probe - * did not answer the query and return FALSE */ - res = FALSE; + /* if a probe dropped without handling, we don't sent it further but assume + * that the probe did not answer the query and return FALSE */ + if (ret != GST_FLOW_CUSTOM_SUCCESS_1) + res = FALSE; + else + res = TRUE; return res; } @@ -3783,9 +4247,12 @@ probe_stopped: GST_DEBUG_OBJECT (pad, "probe stopped: %s", gst_flow_get_name (ret)); GST_OBJECT_UNLOCK (pad); - /* if a probe dropped, we don't sent it further but assume that the probe - * did not answer the query and return FALSE */ - res = FALSE; + /* if a probe dropped without handling, we don't sent it further but + * assume that the probe did not answer the query and return FALSE */ + if (ret != GST_FLOW_CUSTOM_SUCCESS_1) + res = FALSE; + else + res = TRUE; return res; } @@ -3803,6 +4270,7 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data) { GstFlowReturn ret; GstObject *parent; + gboolean handled = FALSE; GST_PAD_STREAM_LOCK (pad); @@ -3816,7 +4284,7 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data) if (G_UNLIKELY (GST_PAD_MODE (pad) != GST_PAD_MODE_PUSH)) goto wrong_mode; -#ifndef G_DISABLE_ASSERT +#ifdef GST_ENABLE_EXTRA_CHECKS if (G_UNLIKELY (pad->priv->last_cookie != pad->priv->events_cookie)) { if (!find_event_by_type (pad, GST_EVENT_STREAM_START, 0)) { g_warning (G_STRLOC @@ -3832,11 +4300,12 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data) } #endif - PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped); + PROBE_HANDLE (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped, + probe_handled); - PROBE_PUSH (pad, type, data, probe_stopped); + PROBE_HANDLE (pad, type, data, probe_stopped, probe_handled); - parent = GST_OBJECT_PARENT (pad); + ACQUIRE_PARENT (pad, parent, no_parent); GST_OBJECT_UNLOCK (pad); /* NOTE: we read the chainfunc unlocked. @@ -3850,13 +4319,13 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data) if (G_UNLIKELY ((chainfunc = GST_PAD_CHAINFUNC (pad)) == NULL)) goto no_function; - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + GST_CAT_DEBUG_OBJECT (GST_CAT_SCHEDULING, pad, "calling chainfunction &%s with buffer %" GST_PTR_FORMAT, GST_DEBUG_FUNCPTR_NAME (chainfunc), GST_BUFFER (data)); ret = chainfunc (pad, parent, GST_BUFFER_CAST (data)); - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + GST_CAT_DEBUG_OBJECT (GST_CAT_SCHEDULING, pad, "called chainfunction &%s with buffer %p, returned %s", GST_DEBUG_FUNCPTR_NAME (chainfunc), data, gst_flow_get_name (ret)); } else { @@ -3865,17 +4334,19 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data) if (G_UNLIKELY ((chainlistfunc = GST_PAD_CHAINLISTFUNC (pad)) == NULL)) goto no_function; - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + GST_CAT_DEBUG_OBJECT (GST_CAT_SCHEDULING, pad, "calling chainlistfunction &%s", GST_DEBUG_FUNCPTR_NAME (chainlistfunc)); ret = chainlistfunc (pad, parent, GST_BUFFER_LIST_CAST (data)); - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + GST_CAT_DEBUG_OBJECT (GST_CAT_SCHEDULING, pad, "called chainlistfunction &%s, returned %s", GST_DEBUG_FUNCPTR_NAME (chainlistfunc), gst_flow_get_name (ret)); } + RELEASE_PARENT (parent); + GST_PAD_STREAM_UNLOCK (pad); return ret; @@ -3907,15 +4378,21 @@ wrong_mode: gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); return GST_FLOW_ERROR; } +probe_handled: + handled = TRUE; + /* PASSTHROUGH */ probe_stopped: { GST_OBJECT_UNLOCK (pad); GST_PAD_STREAM_UNLOCK (pad); - gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + /* We unref the buffer, except if the probe handled it (CUSTOM_SUCCESS_1) */ + if (!handled) + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); switch (ret) { case GST_FLOW_CUSTOM_SUCCESS: - GST_DEBUG_OBJECT (pad, "dropped buffer"); + case GST_FLOW_CUSTOM_SUCCESS_1: + GST_DEBUG_OBJECT (pad, "dropped or handled buffer"); ret = GST_FLOW_OK; break; default: @@ -3924,8 +4401,17 @@ probe_stopped: } return ret; } +no_parent: + { + GST_DEBUG_OBJECT (pad, "No parent when chaining %" GST_PTR_FORMAT, data); + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + GST_OBJECT_UNLOCK (pad); + GST_PAD_STREAM_UNLOCK (pad); + return GST_FLOW_FLUSHING; + } no_function: { + RELEASE_PARENT (parent); gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); g_critical ("chain on pad %s:%s but it has no chainfunction", GST_DEBUG_PAD_NAME (pad)); @@ -3979,7 +4465,7 @@ gst_pad_chain_list_default (GstPad * pad, GstObject * parent, GstBuffer *buffer; GstFlowReturn ret; - GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer"); + GST_INFO_OBJECT (pad, "chaining each buffer in list individually"); len = gst_buffer_list_length (list); @@ -4039,6 +4525,7 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data) { GstPad *peer; GstFlowReturn ret; + gboolean handled = FALSE; GST_OBJECT_LOCK (pad); if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) @@ -4050,7 +4537,7 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data) if (G_UNLIKELY (GST_PAD_MODE (pad) != GST_PAD_MODE_PUSH)) goto wrong_mode; -#ifndef G_DISABLE_ASSERT +#ifdef GST_ENABLE_EXTRA_CHECKS if (G_UNLIKELY (pad->priv->last_cookie != pad->priv->events_cookie)) { if (!find_event_by_type (pad, GST_EVENT_STREAM_START, 0)) { g_warning (G_STRLOC @@ -4070,14 +4557,19 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data) goto events_error; /* do block probes */ - PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped); + PROBE_HANDLE (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped, + probe_handled); /* recheck sticky events because the probe might have cause a relink */ if (G_UNLIKELY ((ret = check_sticky (pad, NULL))) != GST_FLOW_OK) goto events_error; /* do post-blocking probes */ - PROBE_PUSH (pad, type, data, probe_stopped); + PROBE_HANDLE (pad, type, data, probe_stopped, probe_handled); + + /* recheck sticky events because the probe might have cause a relink */ + if (G_UNLIKELY ((ret = check_sticky (pad, NULL))) != GST_FLOW_OK) + goto events_error; if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL)) goto not_linked; @@ -4088,6 +4580,7 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data) GST_OBJECT_UNLOCK (pad); ret = gst_pad_chain_data_unchecked (peer, type, data); + data = NULL; gst_object_unref (peer); @@ -4140,22 +4633,26 @@ events_error: gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); return ret; } +probe_handled: + handled = TRUE; + /* PASSTHROUGH */ probe_stopped: { GST_OBJECT_UNLOCK (pad); - pad->ABI.abi.last_flowret = - ret == GST_FLOW_CUSTOM_SUCCESS ? GST_FLOW_OK : ret; - gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + if (data != NULL && !handled) + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); switch (ret) { case GST_FLOW_CUSTOM_SUCCESS: - GST_DEBUG_OBJECT (pad, "dropped buffer"); + case GST_FLOW_CUSTOM_SUCCESS_1: + GST_DEBUG_OBJECT (pad, "dropped or handled buffer"); ret = GST_FLOW_OK; break; default: GST_DEBUG_OBJECT (pad, "an error occurred %s", gst_flow_get_name (ret)); break; } + pad->ABI.abi.last_flowret = ret; return ret; } not_linked: @@ -4194,12 +4691,17 @@ not_linked: GstFlowReturn gst_pad_push (GstPad * pad, GstBuffer * buffer) { + GstFlowReturn res; + g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR); g_return_val_if_fail (GST_PAD_IS_SRC (pad), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); - return gst_pad_push_data (pad, + GST_TRACER_PAD_PUSH_PRE (pad, buffer); + res = gst_pad_push_data (pad, GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_PUSH, buffer); + GST_TRACER_PAD_PUSH_POST (pad, res); + return res; } /** @@ -4229,12 +4731,17 @@ gst_pad_push (GstPad * pad, GstBuffer * buffer) GstFlowReturn gst_pad_push_list (GstPad * pad, GstBufferList * list) { + GstFlowReturn res; + g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR); g_return_val_if_fail (GST_PAD_IS_SRC (pad), GST_FLOW_ERROR); g_return_val_if_fail (GST_IS_BUFFER_LIST (list), GST_FLOW_ERROR); - return gst_pad_push_data (pad, + GST_TRACER_PAD_PUSH_LIST_PRE (pad, list); + res = gst_pad_push_data (pad, GST_PAD_PROBE_TYPE_BUFFER_LIST | GST_PAD_PROBE_TYPE_PUSH, list); + GST_TRACER_PAD_PUSH_LIST_POST (pad, res); + return res; } static GstFlowReturn @@ -4498,6 +5005,8 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size, g_return_val_if_fail (*buffer == NULL || (GST_IS_BUFFER (*buffer) && gst_buffer_get_size (*buffer) >= size), GST_FLOW_ERROR); + GST_TRACER_PAD_PULL_RANGE_PRE (pad, offset, size); + GST_OBJECT_LOCK (pad); if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) goto flushing; @@ -4544,6 +5053,7 @@ probed_data: *buffer = res_buf; + GST_TRACER_PAD_PULL_RANGE_POST (pad, *buffer, ret); return ret; /* ERROR recovery here */ @@ -4553,7 +5063,8 @@ flushing: "pullrange, but pad was flushing"); pad->ABI.abi.last_flowret = GST_FLOW_FLUSHING; GST_OBJECT_UNLOCK (pad); - return GST_FLOW_FLUSHING; + ret = GST_FLOW_FLUSHING; + goto done; } wrong_mode: { @@ -4561,7 +5072,8 @@ wrong_mode: GST_DEBUG_PAD_NAME (pad)); pad->ABI.abi.last_flowret = GST_FLOW_ERROR; GST_OBJECT_UNLOCK (pad); - return GST_FLOW_ERROR; + ret = GST_FLOW_ERROR; + goto done; } probe_stopped: { @@ -4582,7 +5094,7 @@ probe_stopped: } pad->ABI.abi.last_flowret = ret; GST_OBJECT_UNLOCK (pad); - return ret; + goto done; } not_linked: { @@ -4590,7 +5102,8 @@ not_linked: "pulling range, but it was not linked"); pad->ABI.abi.last_flowret = GST_FLOW_NOT_LINKED; GST_OBJECT_UNLOCK (pad); - return GST_FLOW_NOT_LINKED; + ret = GST_FLOW_NOT_LINKED; + goto done; } pull_range_failed: { @@ -4599,7 +5112,7 @@ pull_range_failed: GST_CAT_LEVEL_LOG (GST_CAT_SCHEDULING, (ret >= GST_FLOW_EOS) ? GST_LEVEL_INFO : GST_LEVEL_WARNING, pad, "pullrange failed, flow: %s", gst_flow_get_name (ret)); - return ret; + goto done; } probe_stopped_unref: { @@ -4615,8 +5128,11 @@ probe_stopped_unref: if (*buffer == NULL) gst_buffer_unref (res_buf); - return ret; + goto done; } +done: + GST_TRACER_PAD_PULL_RANGE_POST (pad, NULL, ret); + return ret; } /* must be called with pad object lock */ @@ -4641,6 +5157,15 @@ store_sticky_event (GstPad * pad, GstEvent * event) || type == GST_EVENT_EOS)))) goto flushed; + /* Unset the EOS flag when received STREAM_START event, so pad can + * store sticky event and then push it later */ + if (type == GST_EVENT_STREAM_START) { + GST_LOG_OBJECT (pad, "Removing pending EOS and StreamGroupDone events"); + remove_event_by_type (pad, GST_EVENT_EOS); + remove_event_by_type (pad, GST_EVENT_STREAM_GROUP_DONE); + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_EOS); + } + if (G_UNLIKELY (GST_PAD_IS_EOS (pad))) goto eos; @@ -4733,7 +5258,7 @@ eos: /** * gst_pad_store_sticky_event: * @pad: a #GstPad - * @event: a #GstEvent + * @event: (transfer none): a #GstEvent * * Store the sticky @event on @pad * @@ -4778,6 +5303,7 @@ gst_pad_push_event_unchecked (GstPad * pad, GstEvent * event, GstFlowReturn ret; GstPad *peerpad; GstEventType event_type; + gint64 old_pad_offset = pad->offset; /* pass the adjusted event on. We need to do this even if * there is no peer pad because of the probes. */ @@ -4803,6 +5329,7 @@ gst_pad_push_event_unchecked (GstPad * pad, GstEvent * event, /* Remove sticky EOS events */ GST_LOG_OBJECT (pad, "Removing pending EOS events"); remove_event_by_type (pad, GST_EVENT_EOS); + remove_event_by_type (pad, GST_EVENT_STREAM_GROUP_DONE); remove_event_by_type (pad, GST_EVENT_SEGMENT); GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_EOS); pad->ABI.abi.last_flowret = GST_FLOW_OK; @@ -4829,6 +5356,17 @@ gst_pad_push_event_unchecked (GstPad * pad, GstEvent * event, } PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_PUSH | GST_PAD_PROBE_TYPE_BLOCK, event, probe_stopped); + /* recheck sticky events because the probe might have cause a relink */ + if (GST_PAD_HAS_PENDING_EVENTS (pad) && GST_PAD_IS_SRC (pad) + && (GST_EVENT_IS_SERIALIZED (event) + || GST_EVENT_IS_STICKY (event))) { + PushStickyData data = { GST_FLOW_OK, FALSE, event }; + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_PENDING_EVENTS); + + /* Push all sticky events before our current one + * that have changed */ + events_foreach (pad, sticky_changed, &data); + } break; } } @@ -4848,6 +5386,15 @@ gst_pad_push_event_unchecked (GstPad * pad, GstEvent * event, events_foreach (pad, sticky_changed, &data); } + /* the pad offset might've been changed by any of the probes above. It + * would've been taken into account when repushing any of the sticky events + * above but not for our current event here */ + if (G_UNLIKELY (old_pad_offset != pad->offset)) { + event = + _apply_pad_offset (pad, event, GST_PAD_IS_SINK (pad), + pad->offset - old_pad_offset); + } + /* now check the peer pad */ peerpad = GST_PAD_PEER (pad); if (peerpad == NULL) @@ -4895,9 +5442,13 @@ inactive: probe_stopped: { GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS); - gst_event_unref (event); + if (ret != GST_FLOW_CUSTOM_SUCCESS_1) + gst_event_unref (event); switch (ret) { + case GST_FLOW_CUSTOM_SUCCESS_1: + GST_DEBUG_OBJECT (pad, "handled event"); + break; case GST_FLOW_CUSTOM_SUCCESS: GST_DEBUG_OBJECT (pad, "dropped event"); break; @@ -4953,6 +5504,8 @@ gst_pad_push_event (GstPad * pad, GstEvent * event) g_return_val_if_fail (GST_IS_PAD (pad), FALSE); g_return_val_if_fail (GST_IS_EVENT (event), FALSE); + GST_TRACER_PAD_PUSH_EVENT_PRE (pad, event); + if (GST_PAD_IS_SRC (pad)) { if (G_UNLIKELY (!GST_EVENT_IS_DOWNSTREAM (event))) goto wrong_direction; @@ -4993,7 +5546,8 @@ gst_pad_push_event (GstPad * pad, GstEvent * event) /* other events are pushed right away */ ret = gst_pad_push_event_unchecked (pad, event, type); /* dropped events by a probe are not an error */ - res = (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_SUCCESS); + res = (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_SUCCESS + || ret == GST_FLOW_CUSTOM_SUCCESS_1); } else { /* Errors in sticky event pushing are no problem and ignored here * as they will cause more meaningful errors during data flow. @@ -5006,6 +5560,7 @@ gst_pad_push_event (GstPad * pad, GstEvent * event) } GST_OBJECT_UNLOCK (pad); + GST_TRACER_PAD_PUSH_EVENT_POST (pad, res); return res; /* ERROR handling */ @@ -5014,28 +5569,31 @@ wrong_direction: g_warning ("pad %s:%s pushing %s event in wrong direction", GST_DEBUG_PAD_NAME (pad), GST_EVENT_TYPE_NAME (event)); gst_event_unref (event); - return FALSE; + goto done; } unknown_direction: { g_warning ("pad %s:%s has invalid direction", GST_DEBUG_PAD_NAME (pad)); gst_event_unref (event); - return FALSE; + goto done; } flushed: { GST_DEBUG_OBJECT (pad, "We're flushing"); GST_OBJECT_UNLOCK (pad); gst_event_unref (event); - return FALSE; + goto done; } eos: { GST_DEBUG_OBJECT (pad, "We're EOS"); GST_OBJECT_UNLOCK (pad); gst_event_unref (event); - return FALSE; + goto done; } +done: + GST_TRACER_PAD_PUSH_EVENT_POST (pad, FALSE); + return FALSE; } /* Check if we can call the event function with the given event */ @@ -5076,10 +5634,13 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event, GstEventType event_type; gboolean serialized, need_unlock = FALSE, sticky; GstPadEventFunction eventfunc; + GstPadEventFullFunction eventfullfunc = NULL; GstObject *parent; + gint64 old_pad_offset; GST_OBJECT_LOCK (pad); + old_pad_offset = pad->offset; event = apply_pad_offset (pad, event, GST_PAD_IS_SRC (pad)); if (GST_PAD_IS_SINK (pad)) @@ -5099,6 +5660,8 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event, GST_PAD_SET_FLUSHING (pad); GST_CAT_DEBUG_OBJECT (GST_CAT_EVENT, pad, "set flush flag"); + GST_PAD_BLOCK_BROADCAST (pad); + type |= GST_PAD_PROBE_TYPE_EVENT_FLUSH; break; case GST_EVENT_FLUSH_STOP: /* we can't accept flush-stop on inactive pads else the flushing flag @@ -5113,6 +5676,7 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event, /* Remove pending EOS events */ GST_LOG_OBJECT (pad, "Removing pending EOS and SEGMENT events"); remove_event_by_type (pad, GST_EVENT_EOS); + remove_event_by_type (pad, GST_EVENT_STREAM_GROUP_DONE); remove_event_by_type (pad, GST_EVENT_SEGMENT); GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_EOS); pad->ABI.abi.last_flowret = GST_FLOW_OK; @@ -5135,6 +5699,18 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event, if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) goto flushing; + switch (event_type) { + case GST_EVENT_STREAM_START: + /* Remove sticky EOS events */ + GST_LOG_OBJECT (pad, "Removing pending EOS events"); + remove_event_by_type (pad, GST_EVENT_EOS); + remove_event_by_type (pad, GST_EVENT_STREAM_GROUP_DONE); + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_EOS); + break; + default: + break; + } + if (serialized) { if (G_UNLIKELY (GST_PAD_IS_EOS (pad))) goto eos; @@ -5160,7 +5736,18 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event, PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_PUSH, event, probe_stopped); - if (G_UNLIKELY ((eventfunc = GST_PAD_EVENTFUNC (pad)) == NULL)) + /* the pad offset might've been changed by any of the probes above. It + * would've been taken into account when repushing any of the sticky events + * above but not for our current event here */ + if (G_UNLIKELY (old_pad_offset != pad->offset)) { + event = + _apply_pad_offset (pad, event, GST_PAD_IS_SRC (pad), + pad->offset - old_pad_offset); + } + + eventfullfunc = GST_PAD_EVENTFULLFUNC (pad); + eventfunc = GST_PAD_EVENTFUNC (pad); + if (G_UNLIKELY (eventfunc == NULL && eventfullfunc == NULL)) goto no_function; ACQUIRE_PARENT (pad, parent, no_parent); @@ -5173,7 +5760,9 @@ gst_pad_send_event_unchecked (GstPad * pad, GstEvent * event, if (sticky) gst_event_ref (event); - if (eventfunc (pad, parent, event)) { + if (eventfullfunc) { + ret = eventfullfunc (pad, parent, event); + } else if (eventfunc (pad, parent, event)) { ret = GST_FLOW_OK; } else { /* something went wrong */ @@ -5249,11 +5838,14 @@ probe_stopped: GST_OBJECT_UNLOCK (pad); if (need_unlock) GST_PAD_STREAM_UNLOCK (pad); - gst_event_unref (event); + /* Only unref if unhandled */ + if (ret != GST_FLOW_CUSTOM_SUCCESS_1) + gst_event_unref (event); switch (ret) { + case GST_FLOW_CUSTOM_SUCCESS_1: case GST_FLOW_CUSTOM_SUCCESS: - GST_DEBUG_OBJECT (pad, "dropped event"); + GST_DEBUG_OBJECT (pad, "dropped or handled event"); ret = GST_FLOW_OK; break; default: @@ -5385,7 +5977,7 @@ gst_pad_set_element_private (GstPad * pad, gpointer priv) * Gets the private data of a pad. * No locking is performed in this function. * - * Returns: (transfer none): a #gpointer to the private data. + * Returns: (transfer none) (nullable): a #gpointer to the private data. */ gpointer gst_pad_get_element_private (GstPad * pad) @@ -5617,6 +6209,9 @@ gst_pad_pause_task (GstPad * pad) if (task == NULL) goto no_task; res = gst_task_set_state (task, GST_TASK_PAUSED); + /* unblock activation waits if any */ + pad->priv->in_activation = FALSE; + g_cond_broadcast (&pad->priv->activation_cond); GST_OBJECT_UNLOCK (pad); /* wait for task function to finish, this lock is recursive so it does nothing @@ -5635,6 +6230,42 @@ no_task: } /** + * gst_pad_get_task_state: + * @pad: the #GstPad to get task state from + * + * Get @pad task state. If no task is currently + * set, #GST_TASK_STOPPED is returned. + * + * Returns: The current state of @pad's task. + * + * Since: 1.12 + */ +GstTaskState +gst_pad_get_task_state (GstPad * pad) +{ + GstTask *task; + GstTaskState res; + + g_return_val_if_fail (GST_IS_PAD (pad), GST_TASK_STOPPED); + + GST_OBJECT_LOCK (pad); + task = GST_PAD_TASK (pad); + if (task == NULL) + goto no_task; + res = gst_task_get_state (task); + GST_OBJECT_UNLOCK (pad); + + return res; + +no_task: + { + GST_DEBUG_OBJECT (pad, "pad has no task"); + GST_OBJECT_UNLOCK (pad); + return GST_TASK_STOPPED; + } +} + +/** * gst_pad_stop_task: * @pad: the #GstPad to stop the task of * @@ -5666,6 +6297,9 @@ gst_pad_stop_task (GstPad * pad) goto no_task; GST_PAD_TASK (pad) = NULL; res = gst_task_set_state (task, GST_TASK_STOPPED); + /* unblock activation waits if any */ + pad->priv->in_activation = FALSE; + g_cond_broadcast (&pad->priv->activation_cond); GST_OBJECT_UNLOCK (pad); GST_PAD_STREAM_LOCK (pad); @@ -5709,7 +6343,7 @@ join_failed: * gst_pad_probe_info_get_event: * @info: a #GstPadProbeInfo * - * Returns: (transfer none): The #GstEvent from the probe + * Returns: (transfer none) (nullable): The #GstEvent from the probe */ GstEvent * @@ -5726,7 +6360,7 @@ gst_pad_probe_info_get_event (GstPadProbeInfo * info) * gst_pad_probe_info_get_query: * @info: a #GstPadProbeInfo * - * Returns: (transfer none): The #GstQuery from the probe + * Returns: (transfer none) (nullable): The #GstQuery from the probe */ GstQuery * @@ -5742,7 +6376,7 @@ gst_pad_probe_info_get_query (GstPadProbeInfo * info) * gst_pad_probe_info_get_buffer: * @info: a #GstPadProbeInfo * - * Returns: (transfer none): The #GstBuffer from the probe + * Returns: (transfer none) (nullable): The #GstBuffer from the probe */ GstBuffer * @@ -5754,10 +6388,10 @@ gst_pad_probe_info_get_buffer (GstPadProbeInfo * info) } /** - * gst_pad_probe_info_get_bufferlist: + * gst_pad_probe_info_get_buffer_list: * @info: a #GstPadProbeInfo * - * Returns: (transfer none): The #GstBufferlist from the probe + * Returns: (transfer none) (nullable): The #GstBufferList from the probe */ GstBufferList *