X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstaggregator.c;h=89778a3dfb9ec9ed1e63439c29bc490cf410717c;hb=f1aba33090ba04596984a11570773512ea44f5d6;hp=9dddf837c46a8078a1925050a1f8a1a84990436c;hpb=79d0239e2fe92ea39a6abc0ecdccc92034a64ce6;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 9dddf83..89778a3 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -22,8 +22,8 @@ /** * SECTION: gstaggregator * @title: GstAggregator - * @short_description: manages a set of pads with the purpose of - * aggregating their buffers. + * @short_description: Base class for mixers and muxers, manages a set of input + * pads and aggregates their streams * @see_also: gstcollectpads for historical reasons. * * Manages a set of pads with the purpose of aggregating their buffers. @@ -41,9 +41,9 @@ * * When data is queued on all pads, the aggregate vmethod is called. * * * One can peek at the data on any given GstAggregatorPad with the - * gst_aggregator_pad_get_buffer () method, and take ownership of it - * with the gst_aggregator_pad_steal_buffer () method. When a buffer - * has been taken with steal_buffer (), a new buffer can be queued + * gst_aggregator_pad_peek_buffer () method, and remove it from the pad + * with the gst_aggregator_pad_pop_buffer () method. When a buffer + * has been taken with pop_buffer (), a new buffer can be queued * on that pad. * * * If the subclass wishes to push a buffer downstream in its aggregate @@ -61,6 +61,22 @@ * flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE * to ease their identification and subsequent processing. * + * This class used to live in gst-plugins-bad and was moved to core. + * + * Since: 1.14 + */ + +/** + * SECTION: gstaggregatorpad + * @title: GstAggregatorPad + * @short_description: #GstPad subclass for pads managed by #GstAggregator + * @see_also: gstcollectpads for historical reasons. + * + * Pads managed by a #GstAggregor subclass. + * + * This class used to live in gst-plugins-bad and was moved to core. + * + * Since: 1.14 */ #ifdef HAVE_CONFIG_H @@ -100,14 +116,18 @@ gst_aggregator_start_time_selection_get_type (void) } /* Might become API */ +#if 0 static void gst_aggregator_merge_tags (GstAggregator * aggregator, const GstTagList * tags, GstTagMergeMode mode); +#endif static void gst_aggregator_set_latency_property (GstAggregator * agg, GstClockTime latency); static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg); static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); +static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad); + GST_DEBUG_CATEGORY_STATIC (aggregator_debug); #define GST_CAT_DEFAULT aggregator_debug @@ -366,7 +386,7 @@ gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) static gboolean gst_aggregator_check_pads_ready (GstAggregator * self) { - GstAggregatorPad *pad; + GstAggregatorPad *pad = NULL; GList *l, *sinkpads; gboolean have_buffer = TRUE; gboolean have_event_or_query = FALSE; @@ -521,17 +541,8 @@ gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps) GST_PAD_STREAM_UNLOCK (self->srcpad); } -/** - * gst_aggregator_finish_buffer: - * @self: The #GstAggregator - * @buffer: (transfer full): the #GstBuffer to push. - * - * This method will push the provided output buffer downstream. If needed, - * mandatory events such as stream-start, caps, and segment events will be - * sent before pushing the buffer. - */ -GstFlowReturn -gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer) +static GstFlowReturn +gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer) { gst_aggregator_push_mandatory_events (self); @@ -549,6 +560,25 @@ gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer) } } +/** + * gst_aggregator_finish_buffer: + * @aggregator: The #GstAggregator + * @buffer: (transfer full): the #GstBuffer to push. + * + * This method will push the provided output buffer downstream. If needed, + * mandatory events such as stream-start, caps, and segment events will be + * sent before pushing the buffer. + */ +GstFlowReturn +gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer) +{ + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator); + + g_assert (klass->finish_buffer != NULL); + + return klass->finish_buffer (aggregator, buffer); +} + static void gst_aggregator_push_eos (GstAggregator * self) { @@ -746,6 +776,42 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, return TRUE; } +static gboolean +gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad, + gpointer user_data) +{ + GList *item; + GstAggregatorPad *aggpad = (GstAggregatorPad *) epad; + GstAggregator *agg = (GstAggregator *) self; + GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad); + + if (!klass->skip_buffer) + return FALSE; + + PAD_LOCK (aggpad); + + item = g_queue_peek_head_link (&aggpad->priv->data); + while (item) { + GList *next = item->next; + + if (GST_IS_BUFFER (item->data) + && klass->skip_buffer (aggpad, agg, item->data)) { + GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data); + gst_aggregator_pad_buffer_consumed (aggpad); + gst_buffer_unref (item->data); + g_queue_delete_link (&aggpad->priv->data, item); + } else { + break; + } + + item = next; + } + + PAD_UNLOCK (aggpad); + + return TRUE; +} + static void gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, GstFlowReturn flow_return, gboolean full) @@ -1028,6 +1094,10 @@ gst_aggregator_aggregate_func (GstAggregator * self) gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), gst_aggregator_do_events_and_queries, NULL); + if (self->priv->peer_latency_live) + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_pad_skip_buffers, NULL); + /* Ensure we have buffers ready (either in clipped_buffer or at the head of * the queue */ if (!gst_aggregator_wait_and_check (self, &timeout)) @@ -1412,19 +1482,7 @@ gst_aggregator_default_sink_event (GstAggregator * self, goto eat; } case GST_EVENT_TAG: - { - GstTagList *tags; - - gst_event_parse_tag (event, &tags); - - if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) { - gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE); - gst_event_unref (event); - event = NULL; - goto eat; - } - break; - } + goto eat; default: { break; @@ -2227,6 +2285,8 @@ gst_aggregator_class_init (GstAggregatorClass * klass) GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator", GST_DEBUG_FG_MAGENTA, "GstAggregator"); + klass->finish_buffer = gst_aggregator_default_finish_buffer; + klass->sink_event = gst_aggregator_default_sink_event; klass->sink_query = gst_aggregator_default_sink_query; @@ -2269,8 +2329,6 @@ gst_aggregator_class_init (GstAggregatorClass * klass) "Start time to use if start-time-selection=set", 0, G_MAXUINT64, DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries); } static void @@ -2411,8 +2469,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, GstFlowReturn flow_return; GstClockTime buf_pts; - GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); - PAD_LOCK (aggpad); flow_return = aggpad->priv->flow_return; if (flow_return != GST_FLOW_OK) @@ -2683,14 +2739,16 @@ gst_aggregator_pad_constructed (GObject * object) { GstPad *pad = GST_PAD (object); - gst_pad_set_chain_function (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain)); - gst_pad_set_event_full_function_full (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL); - gst_pad_set_query_function (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func)); - gst_pad_set_activatemode_function (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func)); + if (GST_PAD_IS_SINK (pad)) { + gst_pad_set_chain_function (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain)); + gst_pad_set_event_full_function_full (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL); + gst_pad_set_query_function (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func)); + gst_pad_set_activatemode_function (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func)); + } } static void @@ -2799,7 +2857,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) } /** - * gst_aggregator_pad_steal_buffer: + * gst_aggregator_pad_pop_buffer: * @pad: the pad to get buffer from * * Steal the ref to the buffer currently queued in @pad. @@ -2808,7 +2866,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) * queued. You should unref the buffer after usage. */ GstBuffer * -gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) +gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad) { GstBuffer *buffer; @@ -2842,7 +2900,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) { GstBuffer *buf; - buf = gst_aggregator_pad_steal_buffer (pad); + buf = gst_aggregator_pad_pop_buffer (pad); if (buf == NULL) return FALSE; @@ -2852,7 +2910,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) } /** - * gst_aggregator_pad_get_buffer: + * gst_aggregator_pad_peek_buffer: * @pad: the pad to get buffer from * * Returns: (transfer full): A reference to the buffer in @pad or @@ -2860,7 +2918,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) * usage. */ GstBuffer * -gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) +gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad) { GstBuffer *buffer; @@ -2896,7 +2954,8 @@ gst_aggregator_pad_is_eos (GstAggregatorPad * pad) return is_eos; } -/** +#if 0 +/* * gst_aggregator_merge_tags: * @self: a #GstAggregator * @tags: a #GstTagList to merge @@ -2930,6 +2989,7 @@ gst_aggregator_merge_tags (GstAggregator * self, self->priv->tags_changed = TRUE; GST_OBJECT_UNLOCK (self); } +#endif /** * gst_aggregator_set_latency: