X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=plugins%2Felements%2Fgsttee.c;h=0ff7527d488fb3f9f3e2c67424a9773546ac417d;hb=44623cacd637efb8f6cf93512238cdad575b9f60;hp=d27d202bfe9929778e89069efa92b87f1973817a;hpb=66d19b65fb6eee8283a6c2ac742be251f66d631c;p=platform%2Fupstream%2Fgstreamer.git diff --git a/plugins/elements/gsttee.c b/plugins/elements/gsttee.c index d27d202..0ff7527 100644 --- a/plugins/elements/gsttee.c +++ b/plugins/elements/gsttee.c @@ -17,12 +17,13 @@ * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. */ /** * SECTION:element-tee + * @title: tee * @see_also: #GstIdentity * * Split data to multiple pads. Branching the data flow is useful when e.g. @@ -34,13 +35,14 @@ * provide separate threads for each branch. Otherwise a blocked dataflow in one * branch would stall the other branches. * - * - * Example launch line + * ## Example launch line * |[ - * gst-launch filesrc location=song.ogg ! decodebin2 ! tee name=t ! queue ! autoaudiosink t. ! queue ! audioconvert ! goom ! ffmpegcolorspace ! autovideosink - * ]| Play a song.ogg from local dir and render visualisations using the goom - * element. - * + * gst-launch-1.0 filesrc location=song.ogg ! decodebin ! tee name=t ! queue ! audioconvert ! audioresample ! autoaudiosink t. ! queue ! audioconvert ! goom ! videoconvert ! autovideosink + * ]| + * + * Play song.ogg audio file which must be in the current working directory + * and render visualisations using the goom element (this can be easier done + * using the playbin element, this is just an example pipeline). */ #ifdef HAVE_CONFIG_H @@ -51,6 +53,7 @@ #include "gst/glib-compat-private.h" #include +#include static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, @@ -78,53 +81,91 @@ gst_tee_pull_mode_get_type (void) return type; } -/* lock to protect request pads from being removed while downstream */ -#define GST_TEE_DYN_LOCK(tee) g_mutex_lock ((tee)->dyn_lock) -#define GST_TEE_DYN_UNLOCK(tee) g_mutex_unlock ((tee)->dyn_lock) - #define DEFAULT_PROP_NUM_SRC_PADS 0 -#define DEFAULT_PROP_HAS_SINK_LOOP FALSE #define DEFAULT_PROP_HAS_CHAIN TRUE #define DEFAULT_PROP_SILENT TRUE #define DEFAULT_PROP_LAST_MESSAGE NULL #define DEFAULT_PULL_MODE GST_TEE_PULL_MODE_NEVER +#define DEFAULT_PROP_ALLOW_NOT_LINKED FALSE enum { PROP_0, PROP_NUM_SRC_PADS, - PROP_HAS_SINK_LOOP, PROP_HAS_CHAIN, PROP_SILENT, PROP_LAST_MESSAGE, PROP_PULL_MODE, PROP_ALLOC_PAD, + PROP_ALLOW_NOT_LINKED, }; -static GstStaticPadTemplate tee_src_template = -GST_STATIC_PAD_TEMPLATE ("src_%u", +static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u", GST_PAD_SRC, GST_PAD_REQUEST, GST_STATIC_CAPS_ANY); -/* structure and quark to keep track of which pads have been pushed */ -static GQuark push_data; - #define _do_init \ - GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element"); \ - push_data = g_quark_from_static_string ("tee-push-data"); + GST_DEBUG_CATEGORY_INIT (gst_tee_debug, "tee", 0, "tee element"); #define gst_tee_parent_class parent_class G_DEFINE_TYPE_WITH_CODE (GstTee, gst_tee, GST_TYPE_ELEMENT, _do_init); static GParamSpec *pspec_last_message = NULL; static GParamSpec *pspec_alloc_pad = NULL; -typedef struct +GType gst_tee_pad_get_type (void); + +#define GST_TYPE_TEE_PAD \ + (gst_tee_pad_get_type()) +#define GST_TEE_PAD(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_TEE_PAD, GstTeePad)) +#define GST_TEE_PAD_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_TEE_PAD, GstTeePadClass)) +#define GST_IS_TEE_PAD(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_TEE_PAD)) +#define GST_IS_TEE_PAD_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_TEE_PAD)) +#define GST_TEE_PAD_CAST(obj) \ + ((GstTeePad *)(obj)) + +typedef struct _GstTeePad GstTeePad; +typedef struct _GstTeePadClass GstTeePadClass; + +struct _GstTeePad { + GstPad parent; + + guint index; gboolean pushed; GstFlowReturn result; gboolean removed; -} PushData; +}; + +struct _GstTeePadClass +{ + GstPadClass parent; +}; + +G_DEFINE_TYPE (GstTeePad, gst_tee_pad, GST_TYPE_PAD); + +static void +gst_tee_pad_class_init (GstTeePadClass * klass) +{ +} + +static void +gst_tee_pad_reset (GstTeePad * pad) +{ + pad->pushed = FALSE; + pad->result = GST_FLOW_NOT_LINKED; + pad->removed = FALSE; +} + +static void +gst_tee_pad_init (GstTeePad * pad) +{ + gst_tee_pad_reset (pad); +} static GstPad *gst_tee_request_new_pad (GstElement * element, GstPadTemplate * temp, const gchar * unused, const GstCaps * caps); @@ -178,9 +219,9 @@ gst_tee_finalize (GObject * object) tee = GST_TEE (object); - g_free (tee->last_message); + g_hash_table_unref (tee->pad_indexes); - g_mutex_free (tee->dyn_lock); + g_free (tee->last_message); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -203,11 +244,6 @@ gst_tee_class_init (GstTeeClass * klass) g_param_spec_int ("num-src-pads", "Num Src Pads", "The number of source pads", 0, G_MAXINT, DEFAULT_PROP_NUM_SRC_PADS, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_HAS_SINK_LOOP, - g_param_spec_boolean ("has-sink-loop", "Has Sink Loop", - "If the element should spawn a thread (unimplemented and deprecated)", - DEFAULT_PROP_HAS_SINK_LOOP, - G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_HAS_CHAIN, g_param_spec_boolean ("has-chain", "Has Chain", "If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN, @@ -227,20 +263,36 @@ gst_tee_class_init (GstTeeClass * klass) DEFAULT_PULL_MODE, G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); pspec_alloc_pad = g_param_spec_object ("alloc-pad", "Allocation Src Pad", - "The pad used for gst_pad_alloc_buffer", GST_TYPE_PAD, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS); + "The pad ALLOCATION queries will be proxied to (DEPRECATED, has no effect)", + GST_TYPE_PAD, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED); g_object_class_install_property (gobject_class, PROP_ALLOC_PAD, pspec_alloc_pad); - gst_element_class_set_details_simple (gstelement_class, + /** + * GstTee:allow-not-linked + * + * This property makes sink pad return GST_FLOW_OK even if there are no + * source pads or any of them is linked. + * + * This is useful to avoid errors when you have a dynamic pipeline and during + * a reconnection you can have all the pads unlinked or removed. + * + * Since: 1.6 + */ + g_object_class_install_property (gobject_class, PROP_ALLOW_NOT_LINKED, + g_param_spec_boolean ("allow-not-linked", "Allow not linked", + "Return GST_FLOW_OK even if there are no source pads or they are " + "all unlinked", DEFAULT_PROP_ALLOW_NOT_LINKED, + G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + gst_element_class_set_static_metadata (gstelement_class, "Tee pipe fitting", "Generic", "1-to-N pipe fitting", "Erik Walthinsen , " "Wim Taymans "); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&sinktemplate)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&tee_src_template)); + gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate); + gst_element_class_add_static_pad_template (gstelement_class, &src_template); gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_tee_request_new_pad); @@ -250,8 +302,6 @@ gst_tee_class_init (GstTeeClass * klass) static void gst_tee_init (GstTee * tee) { - tee->dyn_lock = g_mutex_new (); - tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink"); tee->sink_mode = GST_PAD_MODE_NONE; @@ -267,60 +317,79 @@ gst_tee_init (GstTee * tee) GST_OBJECT_FLAG_SET (tee->sinkpad, GST_PAD_FLAG_PROXY_CAPS); gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad); + tee->pad_indexes = g_hash_table_new (NULL, NULL); + tee->last_message = NULL; } static void gst_tee_notify_alloc_pad (GstTee * tee) { -#if !GLIB_CHECK_VERSION(2,26,0) - g_object_notify ((GObject *) tee, "alloc-pad"); -#else g_object_notify_by_pspec ((GObject *) tee, pspec_alloc_pad); -#endif } static gboolean forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) { GstPad *srcpad = GST_PAD_CAST (user_data); + GstFlowReturn ret; - gst_pad_push_event (srcpad, gst_event_ref (*event)); + ret = gst_pad_store_sticky_event (srcpad, *event); + if (ret != GST_FLOW_OK) { + GST_DEBUG_OBJECT (srcpad, "storing sticky event %p (%s) failed: %s", *event, + GST_EVENT_TYPE_NAME (*event), gst_flow_get_name (ret)); + } return TRUE; } static GstPad * gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ, - const gchar * unused, const GstCaps * caps) + const gchar * name_templ, const GstCaps * caps) { gchar *name; GstPad *srcpad; GstTee *tee; GstPadMode mode; gboolean res; - PushData *data; + guint index = 0; tee = GST_TEE (element); GST_DEBUG_OBJECT (tee, "requesting pad"); GST_OBJECT_LOCK (tee); - name = g_strdup_printf ("src_%u", tee->pad_counter++); - srcpad = gst_pad_new_from_template (templ, name); + if (name_templ && sscanf (name_templ, "src_%u", &index) == 1) { + GST_LOG_OBJECT (element, "name: %s (index %d)", name_templ, index); + if (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index))) { + GST_ERROR_OBJECT (element, "pad name %s is not unique", name_templ); + GST_OBJECT_UNLOCK (tee); + return NULL; + } + if (index >= tee->next_pad_index) + tee->next_pad_index = index + 1; + } else { + index = tee->next_pad_index; + + while (g_hash_table_contains (tee->pad_indexes, GUINT_TO_POINTER (index))) + index++; + + tee->next_pad_index = index + 1; + } + + g_hash_table_insert (tee->pad_indexes, GUINT_TO_POINTER (index), NULL); + + name = g_strdup_printf ("src_%u", index); + + srcpad = GST_PAD_CAST (g_object_new (GST_TYPE_TEE_PAD, + "name", name, "direction", templ->direction, "template", templ, + NULL)); + GST_TEE_PAD_CAST (srcpad)->index = index; g_free (name); mode = tee->sink_mode; - /* install the data, we automatically free it when the pad is disposed because - * of _release_pad or when the element goes away. */ - data = g_new0 (PushData, 1); - data->pushed = FALSE; - data->result = GST_FLOW_NOT_LINKED; - data->removed = FALSE; - g_object_set_qdata_full (G_OBJECT (srcpad), push_data, data, g_free); - GST_OBJECT_UNLOCK (tee); switch (mode) { @@ -343,9 +412,9 @@ gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ, gst_pad_set_query_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_query)); gst_pad_set_getrange_function (srcpad, GST_DEBUG_FUNCPTR (gst_tee_src_get_range)); + GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS); /* Forward sticky events to the new srcpad */ gst_pad_sticky_events_foreach (tee->sinkpad, forward_sticky_events, srcpad); - GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PROXY_CAPS); gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad); return srcpad; @@ -374,37 +443,33 @@ static void gst_tee_release_pad (GstElement * element, GstPad * pad) { GstTee *tee; - PushData *data; gboolean changed = FALSE; + guint index; tee = GST_TEE (element); GST_DEBUG_OBJECT (tee, "releasing pad"); - /* wait for pending pad_alloc to finish */ - GST_TEE_DYN_LOCK (tee); - data = g_object_get_qdata (G_OBJECT (pad), push_data); - GST_OBJECT_LOCK (tee); + index = GST_TEE_PAD_CAST (pad)->index; /* mark the pad as removed so that future pad_alloc fails with NOT_LINKED. */ - data->removed = TRUE; + GST_TEE_PAD_CAST (pad)->removed = TRUE; if (tee->allocpad == pad) { tee->allocpad = NULL; changed = TRUE; } GST_OBJECT_UNLOCK (tee); - gst_object_ref (pad); - gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad); - gst_pad_set_active (pad, FALSE); - GST_TEE_DYN_UNLOCK (tee); - - gst_object_unref (pad); + gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad); if (changed) { gst_tee_notify_alloc_pad (tee); } + + GST_OBJECT_LOCK (tee); + g_hash_table_remove (tee->pad_indexes, GUINT_TO_POINTER (index)); + GST_OBJECT_UNLOCK (tee); } static void @@ -415,12 +480,6 @@ gst_tee_set_property (GObject * object, guint prop_id, const GValue * value, GST_OBJECT_LOCK (tee); switch (prop_id) { - case PROP_HAS_SINK_LOOP: - tee->has_sink_loop = g_value_get_boolean (value); - if (tee->has_sink_loop) { - g_warning ("tee will never implement has-sink-loop==TRUE"); - } - break; case PROP_HAS_CHAIN: tee->has_chain = g_value_get_boolean (value); break; @@ -428,7 +487,7 @@ gst_tee_set_property (GObject * object, guint prop_id, const GValue * value, tee->silent = g_value_get_boolean (value); break; case PROP_PULL_MODE: - tee->pull_mode = g_value_get_enum (value); + tee->pull_mode = (GstTeePullMode) g_value_get_enum (value); break; case PROP_ALLOC_PAD: { @@ -442,6 +501,9 @@ gst_tee_set_property (GObject * object, guint prop_id, const GValue * value, GST_OBJECT_UNLOCK (pad); break; } + case PROP_ALLOW_NOT_LINKED: + tee->allow_not_linked = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -460,9 +522,6 @@ gst_tee_get_property (GObject * object, guint prop_id, GValue * value, case PROP_NUM_SRC_PADS: g_value_set_int (value, GST_ELEMENT (tee)->numsrcpads); break; - case PROP_HAS_SINK_LOOP: - g_value_set_boolean (value, tee->has_sink_loop); - break; case PROP_HAS_CHAIN: g_value_set_boolean (value, tee->has_chain); break; @@ -478,6 +537,9 @@ gst_tee_get_property (GObject * object, guint prop_id, GValue * value, case PROP_ALLOC_PAD: g_value_set_object (value, tee->allocpad); break; + case PROP_ALLOW_NOT_LINKED: + g_value_set_boolean (value, tee->allow_not_linked); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -499,12 +561,253 @@ gst_tee_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) return res; } +struct AllocQueryCtx +{ + GstTee *tee; + GstQuery *query; + GstAllocationParams params; + guint size; + guint min_buffers; + gboolean first_query; + guint num_pads; +}; + +/* This function will aggregate some of the allocation query information with + * the strategy to force upstream allocation. Depending on downstream + * allocation would otherwise make dynamic pipelines much more complicated as + * application would need to now drain buffer in certain cases before getting + * rid of a tee branch. */ +static gboolean +gst_tee_query_allocation (const GValue * item, GValue * ret, gpointer user_data) +{ + struct AllocQueryCtx *ctx = user_data; + GstPad *src_pad = g_value_get_object (item); + GstPad *peer_pad; + GstCaps *caps; + GstQuery *query; + guint count, i, size, min; + + GST_DEBUG_OBJECT (ctx->tee, "Aggregating allocation from pad %s:%s", + GST_DEBUG_PAD_NAME (src_pad)); + + peer_pad = gst_pad_get_peer (src_pad); + if (!peer_pad) { + if (ctx->tee->allow_not_linked) { + GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, but allowed.", + GST_DEBUG_PAD_NAME (src_pad)); + return TRUE; + } else { + GST_DEBUG_OBJECT (ctx->tee, "Pad %s:%s has no peer, ignoring allocation.", + GST_DEBUG_PAD_NAME (src_pad)); + g_value_set_boolean (ret, FALSE); + return FALSE; + } + } + + gst_query_parse_allocation (ctx->query, &caps, NULL); + + query = gst_query_new_allocation (caps, FALSE); + if (!gst_pad_query (peer_pad, query)) { + GST_DEBUG_OBJECT (ctx->tee, + "Allocation query failed on pad %s, ignoring allocation", + GST_PAD_NAME (src_pad)); + g_value_set_boolean (ret, FALSE); + gst_query_unref (query); + gst_object_unref (peer_pad); + return FALSE; + } + + gst_object_unref (peer_pad); + + /* Allocation Params: + * store the maximum alignment, prefix and padding, but ignore the + * allocators and the flags which are tied to downstream allocation */ + count = gst_query_get_n_allocation_params (query); + for (i = 0; i < count; i++) { + GstAllocationParams params = { 0, }; + + gst_query_parse_nth_allocation_param (query, i, NULL, ¶ms); + + GST_DEBUG_OBJECT (ctx->tee, "Aggregating AllocationParams align=%" + G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%" + G_GSIZE_FORMAT, params.align, params.prefix, params.padding); + + if (ctx->params.align < params.align) + ctx->params.align = params.align; + + if (ctx->params.prefix < params.prefix) + ctx->params.prefix = params.prefix; + + if (ctx->params.padding < params.padding) + ctx->params.padding = params.padding; + } + + /* Allocation Pool: + * We want to keep the biggest size and biggest minimum number of buffers to + * make sure downstream requirement can be satisfied. We don't really care + * about the maximum, as this is a parameter of the downstream provided + * pool. We only read the first allocation pool as the minimum number of + * buffers is normally constant regardless of the pool being used. */ + if (gst_query_get_n_allocation_pools (query) > 0) { + gst_query_parse_nth_allocation_pool (query, 0, NULL, &size, &min, NULL); + + GST_DEBUG_OBJECT (ctx->tee, + "Aggregating allocation pool size=%u min_buffers=%u", size, min); + + if (ctx->size < size) + ctx->size = size; + + if (ctx->min_buffers < min) + ctx->min_buffers = min; + } + + /* Allocation Meta: + * For allocation meta, we'll need to aggregate the argument using the new + * GstMetaInfo::agggregate_func */ + count = gst_query_get_n_allocation_metas (query); + for (i = 0; i < count; i++) { + guint ctx_index; + GType api; + const GstStructure *param; + + api = gst_query_parse_nth_allocation_meta (query, i, ¶m); + + /* For the first query, copy all metas */ + if (ctx->first_query) { + gst_query_add_allocation_meta (ctx->query, api, param); + continue; + } + + /* Afterward, aggregate the common params */ + if (gst_query_find_allocation_meta (ctx->query, api, &ctx_index)) { + const GstStructure *ctx_param; + + gst_query_parse_nth_allocation_meta (ctx->query, ctx_index, &ctx_param); + + /* Keep meta which has no params */ + if (ctx_param == NULL && param == NULL) + continue; + + GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s", + g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, ctx_index); + } + } + + /* Finally, cleanup metas from the stored query that aren't support on this + * pad. */ + count = gst_query_get_n_allocation_metas (ctx->query); + for (i = 0; i < count;) { + GType api = gst_query_parse_nth_allocation_meta (ctx->query, i, NULL); + + if (!gst_query_find_allocation_meta (query, api, NULL)) { + GST_DEBUG_OBJECT (ctx->tee, "Dropping allocation meta %s", + g_type_name (api)); + gst_query_remove_nth_allocation_meta (ctx->query, i); + count--; + continue; + } + + i++; + } + + ctx->first_query = FALSE; + ctx->num_pads++; + gst_query_unref (query); + + return TRUE; +} + + +static void +gst_tee_clear_query_allocation_meta (GstQuery * query) +{ + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + for (i = 1; i <= count; i++) + gst_query_remove_nth_allocation_meta (query, count - i); +} + static gboolean gst_tee_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) { + GstTee *tee = GST_TEE (parent); gboolean res; switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_ALLOCATION: + { + GstIterator *iter; + GValue ret = G_VALUE_INIT; + struct AllocQueryCtx ctx = { tee, query, }; + + g_value_init (&ret, G_TYPE_BOOLEAN); + g_value_set_boolean (&ret, TRUE); + + ctx.first_query = TRUE; + gst_allocation_params_init (&ctx.params); + + iter = gst_element_iterate_src_pads (GST_ELEMENT (tee)); + while (GST_ITERATOR_RESYNC == + gst_iterator_fold (iter, gst_tee_query_allocation, &ret, &ctx)) { + gst_iterator_resync (iter); + ctx.first_query = TRUE; + gst_allocation_params_init (&ctx.params); + ctx.size = 0; + ctx.min_buffers = 0; + ctx.num_pads = 0; + gst_tee_clear_query_allocation_meta (query); + } + + gst_iterator_free (iter); + res = g_value_get_boolean (&ret); + g_value_unset (&ret); + + if (res) { + GST_DEBUG_OBJECT (tee, "Aggregated AllocationParams to align=%" + G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%" + G_GSIZE_FORMAT, ctx.params.align, ctx.params.prefix, + ctx.params.padding); + + GST_DEBUG_OBJECT (tee, + "Aggregated allocation pools size=%u min_buffers=%u", ctx.size, + ctx.min_buffers); + +#ifndef GST_DISABLE_GST_DEBUG + { + guint count = gst_query_get_n_allocation_metas (query); + guint i; + + GST_DEBUG_OBJECT (tee, "Aggregated %u allocation meta:", count); + + for (i = 0; i < count; i++) + GST_DEBUG_OBJECT (tee, " + aggregated allocation meta %s", + g_type_name (gst_query_parse_nth_allocation_meta (ctx.query, i, + NULL))); + } +#endif + + /* Allocate one more buffers when multiplexing so we don't starve the + * downstream threads. */ + if (ctx.num_pads > 1) + ctx.min_buffers++; + + /* Check that we actually have parameters besides the defaults. */ + if (ctx.params.align || ctx.params.prefix || ctx.params.padding) { + gst_query_add_allocation_param (ctx.query, NULL, &ctx.params); + } + /* When size == 0, buffers created from this pool would have no memory + * allocated. */ + if (ctx.size) { + gst_query_add_allocation_pool (ctx.query, NULL, ctx.size, + ctx.min_buffers, 0); + } + } else { + gst_tee_clear_query_allocation_meta (query); + } + break; + } default: res = gst_pad_query_default (pad, parent, query); break; @@ -529,11 +832,7 @@ gst_tee_do_message (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list) } GST_OBJECT_UNLOCK (tee); -#if !GLIB_CHECK_VERSION(2,26,0) - g_object_notify ((GObject *) tee, "last-message"); -#else g_object_notify_by_pspec ((GObject *) tee, pspec_last_message); -#endif } static GstFlowReturn @@ -558,15 +857,8 @@ gst_tee_do_push (GstTee * tee, GstPad * pad, gpointer data, gboolean is_list) static void clear_pads (GstPad * pad, GstTee * tee) { - PushData *data; - - data = g_object_get_qdata ((GObject *) pad, push_data); - - /* the data must be there or we have a screwed up internal state */ - g_assert (data != NULL); - - data->pushed = FALSE; - data->result = GST_FLOW_NOT_LINKED; + GST_TEE_PAD_CAST (pad)->pushed = FALSE; + GST_TEE_PAD_CAST (pad)->result = GST_FLOW_NOT_LINKED; } static GstFlowReturn @@ -590,6 +882,9 @@ gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list) if (!pads->next) { GstPad *pad = GST_PAD_CAST (pads->data); + /* Keep another ref around, a pad probe + * might release and destroy the pad */ + gst_object_ref (pad); GST_OBJECT_UNLOCK (tee); if (pad == tee->pull_pad) { @@ -599,6 +894,18 @@ gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list) } else { ret = gst_pad_push (pad, GST_BUFFER_CAST (data)); } + + GST_OBJECT_LOCK (tee); + if (GST_TEE_PAD_CAST (pad)->removed) + ret = GST_FLOW_NOT_LINKED; + + if (ret == GST_FLOW_NOT_LINKED && tee->allow_not_linked) { + ret = GST_FLOW_OK; + } + GST_OBJECT_UNLOCK (tee); + + gst_object_unref (pad); + return ret; } @@ -606,45 +913,44 @@ gst_tee_handle_data (GstTee * tee, gpointer data, gboolean is_list) g_list_foreach (pads, (GFunc) clear_pads, tee); restart: - cret = GST_FLOW_NOT_LINKED; + if (tee->allow_not_linked) { + cret = GST_FLOW_OK; + } else { + cret = GST_FLOW_NOT_LINKED; + } pads = GST_ELEMENT_CAST (tee)->srcpads; cookie = GST_ELEMENT_CAST (tee)->pads_cookie; while (pads) { GstPad *pad; - PushData *pdata; pad = GST_PAD_CAST (pads->data); - /* get the private data, something is really wrong with the internal state - * when it is not there */ - pdata = g_object_get_qdata ((GObject *) pad, push_data); - g_assert (pdata != NULL); - - if (G_LIKELY (!pdata->pushed)) { + if (G_LIKELY (!GST_TEE_PAD_CAST (pad)->pushed)) { /* not yet pushed, release lock and start pushing */ gst_object_ref (pad); GST_OBJECT_UNLOCK (tee); - GST_LOG_OBJECT (tee, "Starting to push %s %p", + GST_LOG_OBJECT (pad, "Starting to push %s %p", is_list ? "list" : "buffer", data); ret = gst_tee_do_push (tee, pad, data, is_list); - GST_LOG_OBJECT (tee, "Pushing item %p yielded result %s", data, + GST_LOG_OBJECT (pad, "Pushing item %p yielded result %s", data, gst_flow_get_name (ret)); GST_OBJECT_LOCK (tee); - /* keep track of which pad we pushed and the result value. We need to do - * this before we release the refcount on the pad, the PushData is - * destroyed when the last ref of the pad goes away. */ - pdata->pushed = TRUE; - pdata->result = ret; + /* keep track of which pad we pushed and the result value */ + if (GST_TEE_PAD_CAST (pad)->removed) + ret = GST_FLOW_NOT_LINKED; + GST_TEE_PAD_CAST (pad)->pushed = TRUE; + GST_TEE_PAD_CAST (pad)->result = ret; gst_object_unref (pad); + pad = NULL; } else { /* already pushed, use previous return value */ - ret = pdata->result; - GST_LOG_OBJECT (tee, "pad already pushed with %s", + ret = GST_TEE_PAD_CAST (pad)->result; + GST_LOG_OBJECT (pad, "pad already pushed with %s", gst_flow_get_name (ret)); } @@ -680,13 +986,23 @@ restart: /* ERRORS */ no_pads: { - GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked"); - ret = GST_FLOW_NOT_LINKED; - goto error; + if (tee->allow_not_linked) { + GST_DEBUG_OBJECT (tee, "there are no pads, dropping %s", + is_list ? "buffer-list" : "buffer"); + ret = GST_FLOW_OK; + } else { + GST_DEBUG_OBJECT (tee, "there are no pads, return not-linked"); + ret = GST_FLOW_NOT_LINKED; + } + goto end; } error: { GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret)); + goto end; + } +end: + { GST_OBJECT_UNLOCK (tee); gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); return ret; @@ -803,7 +1119,7 @@ gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, if (pad == tee->pull_pad) tee->pull_pad = NULL; } - tee->sink_mode = active & GST_PAD_MODE_PULL; + tee->sink_mode = (active ? GST_PAD_MODE_PULL : GST_PAD_MODE_NONE); GST_OBJECT_UNLOCK (tee); break; } @@ -811,6 +1127,7 @@ gst_tee_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, res = TRUE; break; } + return res; /* ERRORS */ @@ -897,8 +1214,10 @@ gst_tee_pull_eos (GstTee * tee) GstIterator *iter; iter = gst_element_iterate_src_pads (GST_ELEMENT (tee)); - gst_iterator_foreach (iter, (GstIteratorForeachFunction) gst_tee_push_eos, - tee); + while (gst_iterator_foreach (iter, + (GstIteratorForeachFunction) gst_tee_push_eos, + tee) == GST_ITERATOR_RESYNC) + gst_iterator_resync (iter); gst_iterator_free (iter); }