X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=plugins%2Felements%2Fgsttee.c;h=0ff7527d488fb3f9f3e2c67424a9773546ac417d;hb=44623cacd637efb8f6cf93512238cdad575b9f60;hp=4b36ab1fc5115bcebe8b387eef89171541847448;hpb=a1d82bec39304f246f6b5bd06c0c7951ca404373;p=platform%2Fupstream%2Fgstreamer.git diff --git a/plugins/elements/gsttee.c b/plugins/elements/gsttee.c index 4b36ab1..0ff7527 100644 --- a/plugins/elements/gsttee.c +++ b/plugins/elements/gsttee.c @@ -17,12 +17,13 @@ * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. */ /** * SECTION:element-tee + * @title: tee * @see_also: #GstIdentity * * Split data to multiple pads. Branching the data flow is useful when e.g. @@ -34,13 +35,14 @@ * provide separate threads for each branch. Otherwise a blocked dataflow in one * branch would stall the other branches. * - * - * Example launch line + * ## Example launch line * |[ - * gst-launch filesrc location=song.ogg ! decodebin2 ! tee name=t ! queue ! autoaudiosink t. ! queue ! audioconvert ! goom ! ffmpegcolorspace ! autovideosink - * ]| Play a song.ogg from local dir and render visualisations using the goom - * element. - * + * gst-launch-1.0 filesrc location=song.ogg ! decodebin ! tee name=t ! queue ! audioconvert ! audioresample ! autoaudiosink t. ! queue ! audioconvert ! goom ! videoconvert ! autovideosink + * ]| + * + * Play song.ogg audio file which must be in the current working directory + * and render visualisations using the goom element (this can be easier done + * using the playbin element, this is just an example pipeline). */ #ifdef HAVE_CONFIG_H @@ -48,8 +50,10 @@ #endif #include "gsttee.h" +#include "gst/glib-compat-private.h" #include +#include static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, @@ -77,52 +81,91 @@ gst_tee_pull_mode_get_type (void) return type; } -/* lock to protect request pads from being removed while downstream */ -#define GST_TEE_DYN_LOCK(tee) g_mutex_lock ((tee)->dyn_lock) -#define GST_TEE_DYN_UNLOCK(tee) g_mutex_unlock ((tee)->dyn_lock) - #define DEFAULT_PROP_NUM_SRC_PADS 0 -#define DEFAULT_PROP_HAS_SINK_LOOP FALSE #define DEFAULT_PROP_HAS_CHAIN TRUE #define DEFAULT_PROP_SILENT TRUE #define DEFAULT_PROP_LAST_MESSAGE NULL #define DEFAULT_PULL_MODE GST_TEE_PULL_MODE_NEVER +#define DEFAULT_PROP_ALLOW_NOT_LINKED FALSE enum { PROP_0, PROP_NUM_SRC_PADS, - PROP_HAS_SINK_LOOP, PROP_HAS_CHAIN, PROP_SILENT, PROP_LAST_MESSAGE, PROP_PULL_MODE, PROP_ALLOC_PAD, + PROP_ALLOW_NOT_LINKED, }; -static GstStaticPadTemplate tee_src_template = GST_STATIC_PAD_TEMPLATE ("src%d", +static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u", GST_PAD_SRC, GST_PAD_REQUEST, GST_STATIC_CAPS_ANY); -/* structure and quark to keep track of which pads have been pushed */ -static GQuark push_data; - #define _do_init \ - GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element"); \ - push_data = g_quark_from_static_string ("tee-push-data"); + GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element"); #define gst_tee_parent_class parent_class G_DEFINE_TYPE_WITH_CODE (GstTee, gst_tee, GST_TYPE_ELEMENT, _do_init); static GParamSpec *pspec_last_message = NULL; static GParamSpec *pspec_alloc_pad = NULL; -typedef struct +GType gst_tee_pad_get_type (void); + +#define GST_TYPE_TEE_PAD \ + (gst_tee_pad_get_type()) +#define GST_TEE_PAD(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_TEE_PAD, GstTeePad)) +#define GST_TEE_PAD_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_TEE_PAD, GstTeePadClass)) +#define GST_IS_TEE_PAD(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_TEE_PAD)) +#define GST_IS_TEE_PAD_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_TEE_PAD)) +#define GST_TEE_PAD_CAST(obj) \ + ((GstTeePad *)(obj)) + +typedef struct _GstTeePad GstTeePad; +typedef struct _GstTeePadClass GstTeePadClass; + +struct _GstTeePad { + GstPad parent; + + guint index; gboolean pushed; GstFlowReturn result; gboolean removed; -} PushData; +}; + +struct _GstTeePadClass +{ + GstPadClass parent; +}; + +G_DEFINE_TYPE (GstTeePad, gst_tee_pad, GST_TYPE_PAD); + +static void +gst_tee_pad_class_init (GstTeePadClass * klass) +{ +} + +static void +gst_tee_pad_reset (GstTeePad * pad) +{ + pad->pushed = FALSE; + pad->result = GST_FLOW_NOT_LINKED; + pad->removed = FALSE; +} + +static void +gst_tee_pad_init (GstTeePad * pad) +{ + gst_tee_pad_reset (pad); +} static GstPad *gst_tee_request_new_pad (GstElement * element, GstPadTemplate * temp, const gchar * unused, const GstCaps * caps); @@ -135,15 +178,22 @@ static void gst_tee_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); static void gst_tee_dispose (GObject * object); -static GstFlowReturn gst_tee_chain (GstPad * pad, GstBuffer * buffer); -static GstFlowReturn gst_tee_chain_list (GstPad * pad, GstBufferList * list); -static gboolean gst_tee_sink_event (GstPad * pad, GstEvent * event); -static gboolean gst_tee_sink_acceptcaps (GstPad * pad, GstCaps * caps); -static gboolean gst_tee_sink_activate_push (GstPad * pad, gboolean active); -static gboolean gst_tee_src_query (GstPad * pad, GstQuery * query); -static gboolean gst_tee_src_activate_pull (GstPad * pad, gboolean active); -static GstFlowReturn gst_tee_src_get_range (GstPad * pad, guint64 offset, - guint length, GstBuffer ** buf); +static GstFlowReturn gst_tee_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer); +static GstFlowReturn gst_tee_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * list); +static gboolean gst_tee_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static gboolean gst_tee_sink_query (GstPad * pad, GstObject * parent, + GstQuery * query); +static gboolean gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active); +static gboolean gst_tee_src_query (GstPad * pad, GstObject * parent, + GstQuery * query); +static gboolean gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, + GstPadMode mode, gboolean active); +static GstFlowReturn gst_tee_src_get_range (GstPad * pad, GstObject * parent, + guint64 offset, guint length, GstBuffer ** buf); static void gst_tee_dispose (GObject * object) @@ -169,9 +219,9 @@ gst_tee_finalize (GObject * object) tee = GST_TEE (object); - g_free (tee->last_message); + g_hash_table_unref (tee->pad_indexes); - g_mutex_free (tee->dyn_lock); + g_free (tee->last_message); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -194,11 +244,6 @@ gst_tee_class_init (GstTeeClass * klass) g_param_spec_int ("num-src-pads", "Num Src Pads", "The number of source pads", 0, G_MAXINT, DEFAULT_PROP_NUM_SRC_PADS, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_HAS_SINK_LOOP, - g_param_spec_boolean ("has-sink-loop", "Has Sink Loop", - "If the element should spawn a thread (unimplemented and deprecated)", - DEFAULT_PROP_HAS_SINK_LOOP, - G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_HAS_CHAIN, g_param_spec_boolean ("has-chain", "Has Chain", "If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN, @@ -218,20 +263,36 @@ gst_tee_class_init (GstTeeClass * klass) DEFAULT_PULL_MODE, G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); pspec_alloc_pad = g_param_spec_object ("alloc-pad", "Allocation Src Pad", - "The pad used for gst_pad_alloc_buffer", GST_TYPE_PAD, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + "The pad ALLOCATION queries will be proxied to (DEPRECATED, has no effect)", + GST_TYPE_PAD, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED); g_object_class_install_property (gobject_class, PROP_ALLOC_PAD, pspec_alloc_pad); - gst_element_class_set_details_simple (gstelement_class, + /** + * GstTee:allow-not-linked + * + * This property makes sink pad return GST_FLOW_OK even if there are no + * source pads or any of them is linked. + * + * This is useful to avoid errors when you have a dynamic pipeline and during + * a reconnection you can have all the pads unlinked or removed. + * + * Since: 1.6 + */ + g_object_class_install_property (gobject_class, PROP_ALLOW_NOT_LINKED, + g_param_spec_boolean ("allow-not-linked", "Allow not linked", + "Return GST_FLOW_OK even if there are no source pads or they are " + "all unlinked", DEFAULT_PROP_ALLOW_NOT_LINKED, + G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + gst_element_class_set_static_metadata (gstelement_class, "Tee pipe fitting", "Generic", "1-to-N pipe fitting", "Erik Walthinsen , " "Wim Taymans "); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&sinktemplate)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&tee_src_template)); + gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate); + gst_element_class_add_static_pad_template (gstelement_class, &src_template); gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_tee_request_new_pad); @@ -241,86 +302,102 @@ gst_tee_class_init (GstTeeClass * klass) static void gst_tee_init (GstTee * tee) { - tee->dyn_lock = g_mutex_new (); - tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink"); - tee->sink_mode = GST_PAD_ACTIVATE_NONE; + tee->sink_mode = GST_PAD_MODE_NONE; gst_pad_set_event_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_sink_event)); - gst_pad_set_getcaps_function (tee->sinkpad, - GST_DEBUG_FUNCPTR (gst_pad_proxy_getcaps)); - gst_pad_set_acceptcaps_function (tee->sinkpad, - GST_DEBUG_FUNCPTR (gst_tee_sink_acceptcaps)); - gst_pad_set_activatepush_function (tee->sinkpad, - GST_DEBUG_FUNCPTR (gst_tee_sink_activate_push)); + gst_pad_set_query_function (tee->sinkpad, + GST_DEBUG_FUNCPTR (gst_tee_sink_query)); + gst_pad_set_activatemode_function (tee->sinkpad, + GST_DEBUG_FUNCPTR (gst_tee_sink_activate_mode)); gst_pad_set_chain_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain)); gst_pad_set_chain_list_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain_list)); + GST_OBJECT_FLAG_SET (tee->sinkpad, GST_PAD_FLAG_PROXY_CAPS); gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad); + tee->pad_indexes = g_hash_table_new (NULL, NULL); + tee->last_message = NULL; } static void gst_tee_notify_alloc_pad (GstTee * tee) { -#if !GLIB_CHECK_VERSION(2,26,0) - g_object_notify ((GObject *) tee, "alloc-pad"); -#else g_object_notify_by_pspec ((GObject *) tee, pspec_alloc_pad); -#endif } -static GstFlowReturn -forward_sticky_events (GstPad * pad, GstEvent * event, gpointer user_data) +static gboolean +forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) { GstPad *srcpad = GST_PAD_CAST (user_data); + GstFlowReturn ret; - gst_pad_push_event (srcpad, gst_event_ref (event)); + ret = gst_pad_store_sticky_event (srcpad, *event); + if (ret != GST_FLOW_OK) { + GST_DEBUG_OBJECT (srcpad, "storing sticky event %p (%s) failed: %s", *event, + GST_EVENT_TYPE_NAME (*event), gst_flow_get_name (ret)); + } - return GST_FLOW_OK; + return TRUE; } static GstPad * gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ, - const gchar * unused, const GstCaps * caps) + const gchar * name_templ, const GstCaps * caps) { gchar *name; GstPad *srcpad; GstTee *tee; - GstPadActivateMode mode; + GstPadMode mode; gboolean res; - PushData *data; + guint index = 0; tee = GST_TEE (element); GST_DEBUG_OBJECT (tee, "requesting pad"); GST_OBJECT_LOCK (tee); - name = g_strdup_printf ("src%d", tee->pad_counter++); - srcpad = gst_pad_new_from_template (templ, name); + if (name_templ && sscanf (name_templ, "src_%u", &index) == 1) { + GST_LOG_OBJECT (element, "name: %s (index %d)", name_templ, index); + if (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index))) { + GST_ERROR_OBJECT (element, "pad name %s is not unique", name_templ); + GST_OBJECT_UNLOCK (tee); + return NULL; + } + if (index >= tee->next_pad_index) + tee->next_pad_index = index + 1; + } else { + index = tee->next_pad_index; + + while (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index))) + index++; + + tee->next_pad_index = index + 1; + } + + g_hash_table_insert (tee->pad_indexes, GUINT_TO_POINTER (index), NULL); + + name = g_strdup_printf ("src_%u", index); + + srcpad = GST_PAD_CAST (g_object_new (GST_TYPE_TEE_PAD, + "name", name, "direction", templ->direction, "template", templ, + NULL)); + GST_TEE_PAD_CAST (srcpad)->index = index; g_free (name); mode = tee->sink_mode; - /* install the data, we automatically free it when the pad is disposed because - * of _release_pad or when the element goes away. */ - data = g_new0 (PushData, 1); - data->pushed = FALSE; - data->result = GST_FLOW_NOT_LINKED; - data->removed = FALSE; - g_object_set_qdata_full (G_OBJECT (srcpad), push_data, data, g_free); - GST_OBJECT_UNLOCK (tee); switch (mode) { - case GST_PAD_ACTIVATE_PULL: + case GST_PAD_MODE_PULL: /* we already have a src pad in pull mode, and our pull mode can only be SINGLE, so fall through to activate this new pad in push mode */ - case GST_PAD_ACTIVATE_PUSH: - res = gst_pad_activate_push (srcpad, TRUE); + case GST_PAD_MODE_PUSH: + res = gst_pad_activate_mode (srcpad, GST_PAD_MODE_PUSH, TRUE); break; default: res = TRUE; @@ -330,13 +407,12 @@ gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ, if (!res) goto activate_failed; - gst_pad_set_getcaps_function (srcpad, - GST_DEBUG_FUNCPTR (gst_pad_proxy_getcaps)); - gst_pad_set_activatepull_function (srcpad, - GST_DEBUG_FUNCPTR (gst_tee_src_activate_pull)); + gst_pad_set_activatemode_function (srcpad, + GST_DEBUG_FUNCPTR (gst_tee_src_activate_mode)); gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_query)); gst_pad_set_getrange_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_get_range)); + GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS); /* Forward sticky events to the new srcpad */ gst_pad_sticky_events_foreach (tee->sinkpad, forward_sticky_events, srcpad); gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad); @@ -367,37 +443,33 @@ static void gst_tee_release_pad (GstElement * element, GstPad * pad) { GstTee *tee; - PushData *data; gboolean changed = FALSE; + guint index; tee = GST_TEE (element); GST_DEBUG_OBJECT (tee, "releasing pad"); - /* wait for pending pad_alloc to finish */ - GST_TEE_DYN_LOCK (tee); - data = g_object_get_qdata (G_OBJECT (pad), push_data); - GST_OBJECT_LOCK (tee); + index = GST_TEE_PAD_CAST (pad)->index; /* mark the pad as removed so that future pad_alloc fails with NOT_LINKED. */ - data->removed = TRUE; + GST_TEE_PAD_CAST (pad)->removed = TRUE; if (tee->allocpad == pad) { tee->allocpad = NULL; changed = TRUE; } GST_OBJECT_UNLOCK (tee); - gst_object_ref (pad); - gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad); - gst_pad_set_active (pad, FALSE); - GST_TEE_DYN_UNLOCK (tee); - - gst_object_unref (pad); + gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad); if (changed) { gst_tee_notify_alloc_pad (tee); } + + GST_OBJECT_LOCK (tee); + g_hash_table_remove (tee->pad_indexes, GUINT_TO_POINTER (index)); + GST_OBJECT_UNLOCK (tee); } static void @@ -408,12 +480,6 @@ gst_tee_set_property (GObject * object, guint prop_id, const GValue * value, GST_OBJECT_LOCK (tee); switch (prop_id) { - case PROP_HAS_SINK_LOOP: - tee->has_sink_loop = g_value_get_boolean (value); - if (tee->has_sink_loop) { - g_warning ("tee will never implement has-sink-loop==TRUE"); - } - break; case PROP_HAS_CHAIN: tee->has_chain = g_value_get_boolean (value); break; @@ -421,7 +487,7 @@ gst_tee_set_property (GObject * object, guint prop_id, const GValue * value, tee->silent = g_value_get_boolean (value); break; case PROP_PULL_MODE: - tee->pull_mode = g_value_get_enum (value); + tee->pull_mode = (GstTeePullMode) g_value_get_enum (value); break; case PROP_ALLOC_PAD: { @@ -435,6 +501,9 @@ gst_tee_set_property (GObject * object, guint prop_id, const GValue * value, GST_OBJECT_UNLOCK (pad); break; } + case PROP_ALLOW_NOT_LINKED: + tee->allow_not_linked = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -453,9 +522,6 @@ gst_tee_get_property (GObject * object, guint prop_id, GValue * value, case PROP_NUM_SRC_PADS: g_value_set_int (value, GST_ELEMENT (tee)->numsrcpads); break; - case PROP_HAS_SINK_LOOP: - g_value_set_boolean (value, tee->has_sink_loop); - break; case PROP_HAS_CHAIN: g_value_set_boolean (value, tee->has_chain); break; @@ -471,6 +537,9 @@ gst_tee_get_property (GObject * object, guint prop_id, GValue * value, case PROP_ALLOC_PAD: g_value_set_object (value, tee->allocpad); break; + case PROP_ALLOW_NOT_LINKED: + g_value_set_boolean (value, tee->allow_not_linked); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -479,56 +548,270 @@ gst_tee_get_property (GObject * object, guint prop_id, GValue * value, } static gboolean -gst_tee_sink_event (GstPad * pad, GstEvent * event) +gst_tee_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) { gboolean res; switch (GST_EVENT_TYPE (event)) { default: - res = gst_pad_event_default (pad, event); + res = gst_pad_event_default (pad, parent, event); break; } return res; } -/* on the sink we accept caps that are acceptable to all srcpads */ -static gboolean -gst_tee_sink_acceptcaps (GstPad * pad, GstCaps * caps) +struct AllocQueryCtx { GstTee *tee; - gboolean res, done; - GstIterator *it; - GValue item = { 0, }; - - tee = GST_TEE_CAST (GST_PAD_PARENT (pad)); - - it = gst_element_iterate_src_pads (GST_ELEMENT_CAST (tee)); - - res = TRUE; - done = FALSE; - while (!done && res) { - switch (gst_iterator_next (it, &item)) { - case GST_ITERATOR_OK: - res &= gst_pad_peer_accept_caps (g_value_get_object (&item), caps); - g_value_reset (&item); - break; - case GST_ITERATOR_RESYNC: - res = TRUE; - gst_iterator_resync (it); - break; - case GST_ITERATOR_ERROR: - res = FALSE; - done = TRUE; - break; - case GST_ITERATOR_DONE: - done = TRUE; - break; + GstQuery *query; + GstAllocationParams params; + guint size; + guint min_buffers; + gboolean first_query; + guint num_pads; +}; + +/* This function will aggregate some of the allocation query information with + * the strategy to force upstream allocation. Depending on downstream + * allocation would otherwise make dynamic pipelines much more complicated as + * application would need to now drain buffer in certain cases before getting + * rid of a tee branch. */ +static gboolean +gst_tee_query_allocation (const GValue * item, GValue * ret, gpointer user_data) +{ + struct AllocQueryCtx *ctx = user_data; + GstPad *src_pad = g_value_get_object (item); + GstPad *peer_pad; + GstCaps *caps; + GstQuery *query; + guint count, i, size, min; + + GST_DEBUG_OBJECT (ctx->tee, "Aggregating allocation from pad %s:%s", + GST_DEBUG_PAD_NAME (src_pad)); + + peer_pad = gst_pad_get_peer (src_pad); + if (!peer_pad) { + if (ctx->tee->allow_not_linked) { + GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, but allowed.", + GST_DEBUG_PAD_NAME (src_pad)); + return TRUE; + } else { + GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, ignoring allocation.", + GST_DEBUG_PAD_NAME (src_pad)); + g_value_set_boolean (ret, FALSE); + return FALSE; + } + } + + gst_query_parse_allocation (ctx->query, &caps, NULL); + + query = gst_query_new_allocation (caps, FALSE); + if (!gst_pad_query (peer_pad, query)) { + GST_DEBUG_OBJECT (ctx->tee, + "Allocation query failed on pad %s, ignoring allocation", + GST_PAD_NAME (src_pad)); + g_value_set_boolean (ret, FALSE); + gst_query_unref (query); + gst_object_unref (peer_pad); + return FALSE; + } + + gst_object_unref (peer_pad); + + /* Allocation Params: + * store the maximum alignment, prefix and padding, but ignore the + * allocators and the flags which are tied to downstream allocation */ + count = gst_query_get_n_allocation_params (query); + for (i = 0; i < count; i++) { + GstAllocationParams params = { 0, }; + + gst_query_parse_nth_allocation_param (query, i, NULL, ¶ms); + + GST_DEBUG_OBJECT (ctx->tee, "Aggregating AllocationParams align=%" + G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%" + G_GSIZE_FORMAT, params.align, params.prefix, params.padding); + + if (ctx->params.align < params.align) + ctx->params.align = params.align; + + if (ctx->params.prefix < params.prefix) + ctx->params.prefix = params.prefix; + + if (ctx->params.padding < params.padding) + ctx->params.padding = params.padding; + } + + /* Allocation Pool: + * We want to keep the biggest size and biggest minimum number of buffers to + * make sure downstream requirement can be satisfied. We don't really care + * about the maximum, as this is a parameter of the downstream provided + * pool. We only read the first allocation pool as the minimum number of + * buffers is normally constant regardless of the pool being used. */ + if (gst_query_get_n_allocation_pools (query) > 0) { + gst_query_parse_nth_allocation_pool (query, 0, NULL, &size, &min, NULL); + + GST_DEBUG_OBJECT (ctx->tee, + "Aggregating allocation pool size=%u min_buffers=%u", size, min); + + if (ctx->size < size) + ctx->size = size; + + if (ctx->min_buffers < min) + ctx->min_buffers = min; + } + + /* Allocation Meta: + * For allocation meta, we'll need to aggregate the argument using the new + * GstMetaInfo::agggregate_func */ + count = gst_query_get_n_allocation_metas (query); + for (i = 0; i < count; i++) { + guint ctx_index; + GType api; + const GstStructure *param; + + api = gst_query_parse_nth_allocation_meta (query, i, ¶m); + + /* For the first query, copy all metas */ + if (ctx->first_query) { + gst_query_add_allocation_meta (ctx->query, api, param); + continue; } + + /* Afterward, aggregate the common params */ + if (gst_query_find_allocation_meta (ctx->query, api, &ctx_index)) { + const GstStructure *ctx_param; + + gst_query_parse_nth_allocation_meta (ctx->query, ctx_index, &ctx_param); + + /* Keep meta which has no params */ + if (ctx_param == NULL && param == NULL) + continue; + + GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s", + g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, ctx_index); + } + } + + /* Finally, cleanup metas from the stored query that aren't support on this + * pad. */ + count = gst_query_get_n_allocation_metas (ctx->query); + for (i = 0; i < count;) { + GType api = gst_query_parse_nth_allocation_meta (ctx->query, i, NULL); + + if (!gst_query_find_allocation_meta (query, api, NULL)) { + GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s", + g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, i); + count--; + continue; + } + + i++; } - g_value_unset (&item); - gst_iterator_free (it); + ctx->first_query = FALSE; + ctx->num_pads++; + gst_query_unref (query); + + return TRUE; +} + + +static void +gst_tee_clear_query_allocation_meta (GstQuery * query) +{ + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + for (i = 1; i <= count; i++) + gst_query_remove_nth_allocation_meta (query, count - i); +} + +static gboolean +gst_tee_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) +{ + GstTee *tee = GST_TEE (parent); + gboolean res; + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_ALLOCATION: + { + GstIterator *iter; + GValue ret = G_VALUE_INIT; + struct AllocQueryCtx ctx = { tee, query, }; + + g_value_init (&ret, G_TYPE_BOOLEAN); + g_value_set_boolean (&ret, TRUE); + + ctx.first_query = TRUE; + gst_allocation_params_init (&ctx.params); + + iter = gst_element_iterate_src_pads (GST_ELEMENT (tee)); + while (GST_ITERATOR_RESYNC == + gst_iterator_fold (iter, gst_tee_query_allocation, &ret, &ctx)) { + gst_iterator_resync (iter); + ctx.first_query = TRUE; + gst_allocation_params_init (&ctx.params); + ctx.size = 0; + ctx.min_buffers = 0; + ctx.num_pads = 0; + gst_tee_clear_query_allocation_meta (query); + } + + gst_iterator_free (iter); + res = g_value_get_boolean (&ret); + g_value_unset (&ret); + + if (res) { + GST_DEBUG_OBJECT (tee, "Aggregated AllocationParams to align=%" + G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%" + G_GSIZE_FORMAT, ctx.params.align, ctx.params.prefix, + ctx.params.padding); + + GST_DEBUG_OBJECT (tee, + "Aggregated allocation pools size=%u min_buffers=%u", ctx.size, + ctx.min_buffers); + +#ifndef GST_DISABLE_GST_DEBUG + { + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + GST_DEBUG_OBJECT (tee, "Aggregated %u allocation meta:", count); + + for (i = 0; i < count; i++) + GST_DEBUG_OBJECT (tee, " + aggregated allocation meta %s", + g_type_name (gst_query_parse_nth_allocation_meta (ctx.query, i, + NULL))); + } +#endif + + /* Allocate one more buffers when multiplexing so we don't starve the + * downstream threads. */ + if (ctx.num_pads > 1) + ctx.min_buffers++; + + /* Check that we actually have parameters besides the defaults. */ + if (ctx.params.align || ctx.params.prefix || ctx.params.padding) { + gst_query_add_allocation_param (ctx.query, NULL, &ctx.params); + } + /* When size == 0, buffers created from this pool would have no memory + * allocated. */ + if (ctx.size) { + gst_query_add_allocation_pool (ctx.query, NULL, ctx.size, + ctx.min_buffers, 0); + } + } else { + gst_tee_clear_query_allocation_meta (query); + } + break; + } + default: + res = gst_pad_query_default (pad, parent, query); + break; + } return res; } @@ -549,11 +832,7 @@ gst_tee_do_message (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list) } GST_OBJECT_UNLOCK (tee); -#if !GLIB_CHECK_VERSION(2,26,0) - g_object_notify ((GObject *) tee, "last-message"); -#else g_object_notify_by_pspec ((GObject *) tee, pspec_last_message); -#endif } static GstFlowReturn @@ -578,15 +857,8 @@ gst_tee_do_push (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list) static void clear_pads (GstPad * pad, GstTee * tee) { - PushData *data; - - data = g_object_get_qdata ((GObject *) pad, push_data); - - /* the data must be there or we have a screwed up internal state */ - g_assert (data != NULL); - - data->pushed = FALSE; - data->result = GST_FLOW_NOT_LINKED; + GST_TEE_PAD_CAST (pad)->pushed = FALSE; + GST_TEE_PAD_CAST (pad)->result = GST_FLOW_NOT_LINKED; } static GstFlowReturn @@ -610,6 +882,9 @@ gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list) if (!pads->next) { GstPad *pad = GST_PAD_CAST (pads->data); + /* Keep another ref around, a pad probe + * might release and destroy the pad */ + gst_object_ref (pad); GST_OBJECT_UNLOCK (tee); if (pad == tee->pull_pad) { @@ -619,6 +894,18 @@ gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list) } else { ret = gst_pad_push (pad, GST_BUFFER_CAST (data)); } + + GST_OBJECT_LOCK (tee); + if (GST_TEE_PAD_CAST (pad)->removed) + ret = GST_FLOW_NOT_LINKED; + + if (ret == GST_FLOW_NOT_LINKED && tee->allow_not_linked) { + ret = GST_FLOW_OK; + } + GST_OBJECT_UNLOCK (tee); + + gst_object_unref (pad); + return ret; } @@ -626,45 +913,44 @@ gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list) g_list_foreach (pads, (GFunc) clear_pads, tee); restart: - cret = GST_FLOW_NOT_LINKED; + if (tee->allow_not_linked) { + cret = GST_FLOW_OK; + } else { + cret = GST_FLOW_NOT_LINKED; + } pads = GST_ELEMENT_CAST (tee)->srcpads; cookie = GST_ELEMENT_CAST (tee)->pads_cookie; while (pads) { GstPad *pad; - PushData *pdata; pad = GST_PAD_CAST (pads->data); - /* get the private data, something is really wrong with the internal state - * when it is not there */ - pdata = g_object_get_qdata ((GObject *) pad, push_data); - g_assert (pdata != NULL); - - if (G_LIKELY (!pdata->pushed)) { + if (G_LIKELY (!GST_TEE_PAD_CAST (pad)->pushed)) { /* not yet pushed, release lock and start pushing */ gst_object_ref (pad); GST_OBJECT_UNLOCK (tee); - GST_LOG_OBJECT (tee, "Starting to push %s %p", + GST_LOG_OBJECT (pad, "Starting to push %s %p", is_list ? "list" : "buffer", data); ret = gst_tee_do_push (tee, pad, data, is_list); - GST_LOG_OBJECT (tee, "Pushing item %p yielded result %s", data, + GST_LOG_OBJECT (pad, "Pushing item %p yielded result %s", data, gst_flow_get_name (ret)); GST_OBJECT_LOCK (tee); - /* keep track of which pad we pushed and the result value. We need to do - * this before we release the refcount on the pad, the PushData is - * destroyed when the last ref of the pad goes away. */ - pdata->pushed = TRUE; - pdata->result = ret; + /* keep track of which pad we pushed and the result value */ + if (GST_TEE_PAD_CAST (pad)->removed) + ret = GST_FLOW_NOT_LINKED; + GST_TEE_PAD_CAST (pad)->pushed = TRUE; + GST_TEE_PAD_CAST (pad)->result = ret; gst_object_unref (pad); + pad = NULL; } else { /* already pushed, use previous return value */ - ret = pdata->result; - GST_LOG_OBJECT (tee, "pad already pushed with %s", + ret = GST_TEE_PAD_CAST (pad)->result; + GST_LOG_OBJECT (pad, "pad already pushed with %s", gst_flow_get_name (ret)); } @@ -700,13 +986,23 @@ restart: /* ERRORS */ no_pads: { - GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked"); - ret = GST_FLOW_NOT_LINKED; - goto error; + if (tee->allow_not_linked) { + GST_DEBUG_OBJECT (tee, "there are no pads, dropping %s", + is_list ? "buffer-list" : "buffer"); + ret = GST_FLOW_OK; + } else { + GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked"); + ret = GST_FLOW_NOT_LINKED; + } + goto end; } error: { GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret)); + goto end; + } +end: + { GST_OBJECT_UNLOCK (tee); gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); return ret; @@ -714,12 +1010,12 @@ error: } static GstFlowReturn -gst_tee_chain (GstPad * pad, GstBuffer * buffer) +gst_tee_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) { GstFlowReturn res; GstTee *tee; - tee = GST_TEE_CAST (GST_OBJECT_PARENT (pad)); + tee = GST_TEE_CAST (parent); GST_DEBUG_OBJECT (tee, "received buffer %p", buffer); @@ -731,12 +1027,12 @@ gst_tee_chain (GstPad * pad, GstBuffer * buffer) } static GstFlowReturn -gst_tee_chain_list (GstPad * pad, GstBufferList * list) +gst_tee_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list) { GstFlowReturn res; GstTee *tee; - tee = GST_TEE_CAST (gst_pad_get_parent (pad)); + tee = GST_TEE_CAST (parent); GST_DEBUG_OBJECT (tee, "received list %p", list); @@ -744,26 +1040,35 @@ gst_tee_chain_list (GstPad * pad, GstBufferList * list) GST_DEBUG_OBJECT (tee, "handled list %s", gst_flow_get_name (res)); - gst_object_unref (tee); - return res; } static gboolean -gst_tee_sink_activate_push (GstPad * pad, gboolean active) +gst_tee_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, + gboolean active) { + gboolean res; GstTee *tee; - tee = GST_TEE (GST_OBJECT_PARENT (pad)); - - GST_OBJECT_LOCK (tee); - tee->sink_mode = active && GST_PAD_ACTIVATE_PUSH; + tee = GST_TEE (parent); - if (active && !tee->has_chain) - goto no_chain; - GST_OBJECT_UNLOCK (tee); + switch (mode) { + case GST_PAD_MODE_PUSH: + { + GST_OBJECT_LOCK (tee); + tee->sink_mode = active ? mode : GST_PAD_MODE_NONE; - return TRUE; + if (active && !tee->has_chain) + goto no_chain; + GST_OBJECT_UNLOCK (tee); + res = TRUE; + break; + } + default: + res = FALSE; + break; + } + return res; /* ERRORS */ no_chain: @@ -776,44 +1081,52 @@ no_chain: } static gboolean -gst_tee_src_activate_pull (GstPad * pad, gboolean active) +gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, + gboolean active) { GstTee *tee; gboolean res; GstPad *sinkpad; - tee = GST_TEE (gst_pad_get_parent (pad)); + tee = GST_TEE (parent); - GST_OBJECT_LOCK (tee); + switch (mode) { + case GST_PAD_MODE_PULL: + { + GST_OBJECT_LOCK (tee); - if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER) - goto cannot_pull; + if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER) + goto cannot_pull; - if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad) - goto cannot_pull_multiple_srcs; + if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad) + goto cannot_pull_multiple_srcs; - sinkpad = gst_object_ref (tee->sinkpad); + sinkpad = gst_object_ref (tee->sinkpad); - GST_OBJECT_UNLOCK (tee); + GST_OBJECT_UNLOCK (tee); - res = gst_pad_activate_pull (sinkpad, active); - gst_object_unref (sinkpad); + res = gst_pad_activate_mode (sinkpad, GST_PAD_MODE_PULL, active); + gst_object_unref (sinkpad); - if (!res) - goto sink_activate_failed; + if (!res) + goto sink_activate_failed; - GST_OBJECT_LOCK (tee); - if (active) { - if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE) - tee->pull_pad = pad; - } else { - if (pad == tee->pull_pad) - tee->pull_pad = NULL; + GST_OBJECT_LOCK (tee); + if (active) { + if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE) + tee->pull_pad = pad; + } else { + if (pad == tee->pull_pad) + tee->pull_pad = NULL; + } + tee->sink_mode = (active ? GST_PAD_MODE_PULL : GST_PAD_MODE_NONE); + GST_OBJECT_UNLOCK (tee); + break; + } + default: + res = TRUE; + break; } - tee->sink_mode = active & GST_PAD_ACTIVATE_PULL; - GST_OBJECT_UNLOCK (tee); - - gst_object_unref (tee); return res; @@ -823,7 +1136,6 @@ cannot_pull: GST_OBJECT_UNLOCK (tee); GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode " "set to NEVER"); - gst_object_unref (tee); return FALSE; } cannot_pull_multiple_srcs: @@ -831,26 +1143,24 @@ cannot_pull_multiple_srcs: GST_OBJECT_UNLOCK (tee); GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, " "pull-mode set to SINGLE"); - gst_object_unref (tee); return FALSE; } sink_activate_failed: { GST_INFO_OBJECT (tee, "Failed to %sactivate sink pad in pull mode", active ? "" : "de"); - gst_object_unref (tee); return FALSE; } } static gboolean -gst_tee_src_query (GstPad * pad, GstQuery * query) +gst_tee_src_query (GstPad * pad, GstObject * parent, GstQuery * query) { GstTee *tee; gboolean res; GstPad *sinkpad; - tee = GST_TEE (gst_pad_get_parent (pad)); + tee = GST_TEE (parent); switch (GST_QUERY_TYPE (query)) { case GST_QUERY_SCHEDULING: @@ -882,12 +1192,10 @@ gst_tee_src_query (GstPad * pad, GstQuery * query) break; } default: - res = gst_pad_query_default (pad, query); + res = gst_pad_query_default (pad, parent, query); break; } - gst_object_unref (tee); - return res; } @@ -906,19 +1214,21 @@ gst_tee_pull_eos (GstTee * tee) GstIterator *iter; iter = gst_element_iterate_src_pads (GST_ELEMENT (tee)); - gst_iterator_foreach (iter, (GstIteratorForeachFunction) gst_tee_push_eos, - tee); + while (gst_iterator_foreach (iter, + (GstIteratorForeachFunction) gst_tee_push_eos, + tee) == GST_ITERATOR_RESYNC) + gst_iterator_resync (iter); gst_iterator_free (iter); } static GstFlowReturn -gst_tee_src_get_range (GstPad * pad, guint64 offset, guint length, - GstBuffer ** buf) +gst_tee_src_get_range (GstPad * pad, GstObject * parent, guint64 offset, + guint length, GstBuffer ** buf) { GstTee *tee; GstFlowReturn ret; - tee = GST_TEE (gst_pad_get_parent (pad)); + tee = GST_TEE (parent); ret = gst_pad_pull_range (tee->sinkpad, offset, length, buf); @@ -927,7 +1237,5 @@ gst_tee_src_get_range (GstPad * pad, guint64 offset, guint length, else if (ret == GST_FLOW_EOS) gst_tee_pull_eos (tee); - gst_object_unref (tee); - return ret; }