X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstaggregator.c;h=9d6bb7a50db8f441fe0093a89bcd119d0c012fc9;hb=117200faebd922c6b994fac96f0bf6ba1d9a1881;hp=13afdc6572ba0412d03680569585527c99cc0cb2;hpb=113a2c508be734bc4ff89d790878fe656ebb90d1;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 13afdc6..9d6bb7a 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -22,8 +22,8 @@ /** * SECTION: gstaggregator * @title: GstAggregator - * @short_description: manages a set of pads with the purpose of - * aggregating their buffers. + * @short_description: Base class for mixers and muxers, manages a set of input + * pads and aggregates their streams * @see_also: gstcollectpads for historical reasons. * * Manages a set of pads with the purpose of aggregating their buffers. @@ -41,9 +41,9 @@ * * When data is queued on all pads, the aggregate vmethod is called. * * * One can peek at the data on any given GstAggregatorPad with the - * gst_aggregator_pad_get_buffer () method, and take ownership of it - * with the gst_aggregator_pad_steal_buffer () method. When a buffer - * has been taken with steal_buffer (), a new buffer can be queued + * gst_aggregator_pad_peek_buffer () method, and remove it from the pad + * with the gst_aggregator_pad_pop_buffer () method. When a buffer + * has been taken with pop_buffer (), a new buffer can be queued * on that pad. * * * If the subclass wishes to push a buffer downstream in its aggregate @@ -61,6 +61,22 @@ * flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE * to ease their identification and subsequent processing. * + * This class used to live in gst-plugins-bad and was moved to core. + * + * Since: 1.14 + */ + +/** + * SECTION: gstaggregatorpad + * @title: GstAggregatorPad + * @short_description: #GstPad subclass for pads managed by #GstAggregator + * @see_also: gstcollectpads for historical reasons. + * + * Pads managed by a #GstAggregor subclass. + * + * This class used to live in gst-plugins-bad and was moved to core. + * + * Since: 1.14 */ #ifdef HAVE_CONFIG_H @@ -100,14 +116,18 @@ gst_aggregator_start_time_selection_get_type (void) } /* Might become API */ +#if 0 static void gst_aggregator_merge_tags (GstAggregator * aggregator, const GstTagList * tags, GstTagMergeMode mode); +#endif static void gst_aggregator_set_latency_property (GstAggregator * agg, - gint64 latency); -static gint64 gst_aggregator_get_latency_property (GstAggregator * agg); + GstClockTime latency); +static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg); static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); +static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad); + GST_DEBUG_CATEGORY_STATIC (aggregator_debug); #define GST_CAT_DEFAULT aggregator_debug @@ -211,13 +231,15 @@ struct _GstAggregatorPadPrivate GstFlowReturn flow_return; gboolean pending_flush_start; gboolean pending_flush_stop; - gboolean pending_eos; gboolean first_buffer; GQueue data; /* buffers, events and queries */ GstBuffer *clipped_buffer; guint num_buffers; + + /* used to track fill state of queues, only used with live-src and when + * latency property is set to > 0 */ GstClockTime head_position; GstClockTime tail_position; GstClockTime head_time; /* running time */ @@ -241,7 +263,6 @@ struct _GstAggregatorPadPrivate static void gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad) { - aggpad->priv->pending_eos = FALSE; aggpad->priv->eos = FALSE; aggpad->priv->flow_return = GST_FLOW_OK; GST_OBJECT_LOCK (aggpad); @@ -285,6 +306,8 @@ struct _GstAggregatorPrivate /* Our state is >= PAUSED */ gboolean running; /* protected by src_lock */ + /* seqnum from seek or segment, + * to be applied to synthetic segment/eos events */ gint seqnum; gboolean send_stream_start; /* protected by srcpad stream lock */ gboolean send_segment; @@ -353,87 +376,6 @@ enum static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head); -/** - * gst_aggregator_iterate_sinkpads: - * @self: The #GstAggregator - * @func: (scope call): The function to call. - * @user_data: (closure): The data to pass to @func. - * - * Iterate the sinkpads of aggregator to call a function on them. - * - * This method guarantees that @func will be called only once for each - * sink pad. - * - * Returns: %FALSE if there are no sinkpads or if @func returned %FALSE - */ -gboolean -gst_aggregator_iterate_sinkpads (GstAggregator * self, - GstAggregatorPadForeachFunc func, gpointer user_data) -{ - gboolean result = FALSE; - GstIterator *iter; - gboolean done = FALSE; - GValue item = { 0, }; - GList *seen_pads = NULL; - - iter = gst_element_iterate_sink_pads (GST_ELEMENT (self)); - - if (!iter) - goto no_iter; - - while (!done) { - switch (gst_iterator_next (iter, &item)) { - case GST_ITERATOR_OK: - { - GstAggregatorPad *pad; - - pad = g_value_get_object (&item); - - /* if already pushed, skip. FIXME, find something faster to tag pads */ - if (pad == NULL || g_list_find (seen_pads, pad)) { - g_value_reset (&item); - break; - } - - GST_LOG_OBJECT (pad, "calling function %s on pad", - GST_DEBUG_FUNCPTR_NAME (func)); - - result = func (self, pad, user_data); - - done = !result; - - seen_pads = g_list_prepend (seen_pads, pad); - - g_value_reset (&item); - break; - } - case GST_ITERATOR_RESYNC: - gst_iterator_resync (iter); - break; - case GST_ITERATOR_ERROR: - GST_ERROR_OBJECT (self, - "Could not iterate over internally linked pads"); - done = TRUE; - break; - case GST_ITERATOR_DONE: - done = TRUE; - break; - } - } - g_value_unset (&item); - gst_iterator_free (iter); - - if (seen_pads == NULL) { - GST_DEBUG_OBJECT (self, "No pad seen"); - return FALSE; - } - - g_list_free (seen_pads); - -no_iter: - return result; -} - static gboolean gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) { @@ -444,7 +386,7 @@ gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) static gboolean gst_aggregator_check_pads_ready (GstAggregator * self) { - GstAggregatorPad *pad; + GstAggregatorPad *pad = NULL; GList *l, *sinkpads; gboolean have_buffer = TRUE; gboolean have_event_or_query = FALSE; @@ -560,6 +502,8 @@ gst_aggregator_push_mandatory_events (GstAggregator * self) segment = gst_event_new_segment (&self->segment); if (!self->priv->seqnum) + /* This code-path is in preparation to be able to run without a source + * connected. Then we won't have a seq-num from a segment event. */ self->priv->seqnum = gst_event_get_seqnum (segment); else gst_event_set_seqnum (segment, self->priv->seqnum); @@ -597,17 +541,8 @@ gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps) GST_PAD_STREAM_UNLOCK (self->srcpad); } -/** - * gst_aggregator_finish_buffer: - * @self: The #GstAggregator - * @buffer: (transfer full): the #GstBuffer to push. - * - * This method will push the provided output buffer downstream. If needed, - * mandatory events such as stream-start, caps, and segment events will be - * sent before pushing the buffer. - */ -GstFlowReturn -gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer) +static GstFlowReturn +gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer) { gst_aggregator_push_mandatory_events (self); @@ -625,6 +560,25 @@ gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer) } } +/** + * gst_aggregator_finish_buffer: + * @aggregator: The #GstAggregator + * @buffer: (transfer full): the #GstBuffer to push. + * + * This method will push the provided output buffer downstream. If needed, + * mandatory events such as stream-start, caps, and segment events will be + * sent before pushing the buffer. + */ +GstFlowReturn +gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer) +{ + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator); + + g_assert (klass->finish_buffer != NULL); + + return klass->finish_buffer (aggregator, buffer); +} + static void gst_aggregator_push_eos (GstAggregator * self) { @@ -757,9 +711,11 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) } static gboolean -gst_aggregator_do_events_and_queries (GstAggregator * self, - GstAggregatorPad * pad, gpointer user_data) +gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, + gpointer user_data) { + GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); + GstAggregator *aggregator = GST_AGGREGATOR_CAST (self); GstEvent *event = NULL; GstQuery *query = NULL; GstAggregatorClass *klass = NULL; @@ -770,10 +726,6 @@ gst_aggregator_do_events_and_queries (GstAggregator * self, query = NULL; PAD_LOCK (pad); - if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) { - pad->priv->pending_eos = FALSE; - pad->priv->eos = TRUE; - } if (pad->priv->clipped_buffer == NULL && !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))) @@ -793,7 +745,7 @@ gst_aggregator_do_events_and_queries (GstAggregator * self, if (event) { GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); gst_event_ref (event); - ret = klass->sink_event (self, pad, event); + ret = klass->sink_event (aggregator, pad, event); PAD_LOCK (pad); if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) @@ -803,7 +755,7 @@ gst_aggregator_do_events_and_queries (GstAggregator * self, gst_event_unref (event); } else if (query) { GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query); - ret = klass->sink_query (self, pad, query); + ret = klass->sink_query (aggregator, pad, query); PAD_LOCK (pad); if (g_queue_peek_tail (&pad->priv->data) == query) { @@ -824,6 +776,42 @@ gst_aggregator_do_events_and_queries (GstAggregator * self, return TRUE; } +static gboolean +gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad, + gpointer user_data) +{ + GList *item; + GstAggregatorPad *aggpad = (GstAggregatorPad *) epad; + GstAggregator *agg = (GstAggregator *) self; + GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad); + + if (!klass->skip_buffer) + return FALSE; + + PAD_LOCK (aggpad); + + item = g_queue_peek_head_link (&aggpad->priv->data); + while (item) { + GList *next = item->next; + + if (GST_IS_BUFFER (item->data) + && klass->skip_buffer (aggpad, agg, item->data)) { + GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data); + gst_aggregator_pad_buffer_consumed (aggpad); + gst_buffer_unref (item->data); + g_queue_delete_link (&aggpad->priv->data, item); + } else { + break; + } + + item = next; + } + + PAD_UNLOCK (aggpad); + + return TRUE; +} + static void gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, GstFlowReturn flow_return, gboolean full) @@ -1103,14 +1091,21 @@ gst_aggregator_aggregate_func (GstAggregator * self) GstFlowReturn flow_return = GST_FLOW_OK; gboolean processed_event = FALSE; - gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries, - NULL); + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_do_events_and_queries, NULL); + + if (self->priv->peer_latency_live) + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_pad_skip_buffers, NULL); + /* Ensure we have buffers ready (either in clipped_buffer or at the head of + * the queue */ if (!gst_aggregator_wait_and_check (self, &timeout)) continue; - gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries, - &processed_event); + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_do_events_and_queries, &processed_event); + if (processed_event) continue; @@ -1323,38 +1318,37 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, static void update_time_level (GstAggregatorPad * aggpad, gboolean head) { + GstAggregatorPadPrivate *priv = aggpad->priv; + if (head) { - if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) && - aggpad->priv->head_segment.format == GST_FORMAT_TIME) - aggpad->priv->head_time = - gst_segment_to_running_time (&aggpad->priv->head_segment, - GST_FORMAT_TIME, aggpad->priv->head_position); + if (GST_CLOCK_TIME_IS_VALID (priv->head_position) && + priv->head_segment.format == GST_FORMAT_TIME) + priv->head_time = gst_segment_to_running_time (&priv->head_segment, + GST_FORMAT_TIME, priv->head_position); else - aggpad->priv->head_time = GST_CLOCK_TIME_NONE; + priv->head_time = GST_CLOCK_TIME_NONE; - if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time)) - aggpad->priv->tail_time = aggpad->priv->head_time; + if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time)) + priv->tail_time = priv->head_time; } else { - if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) && + if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) && aggpad->segment.format == GST_FORMAT_TIME) - aggpad->priv->tail_time = - gst_segment_to_running_time (&aggpad->segment, - GST_FORMAT_TIME, aggpad->priv->tail_position); + priv->tail_time = gst_segment_to_running_time (&aggpad->segment, + GST_FORMAT_TIME, priv->tail_position); else - aggpad->priv->tail_time = aggpad->priv->head_time; + priv->tail_time = priv->head_time; } - if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE || - aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) { - aggpad->priv->time_level = 0; + if (priv->head_time == GST_CLOCK_TIME_NONE || + priv->tail_time == GST_CLOCK_TIME_NONE) { + priv->time_level = 0; return; } - if (aggpad->priv->tail_time > aggpad->priv->head_time) - aggpad->priv->time_level = 0; + if (priv->tail_time > priv->head_time) + priv->time_level = 0; else - aggpad->priv->time_level = aggpad->priv->head_time - - aggpad->priv->tail_time; + priv->time_level = priv->head_time - priv->tail_time; } @@ -1410,19 +1404,11 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_EOS: { - /* We still have a buffer, and we don't want the subclass to have to - * check for it. Mark pending_eos, eos will be set when steal_buffer is - * called - */ SRC_LOCK (self); PAD_LOCK (aggpad); - if (aggpad->priv->num_buffers == 0) { - aggpad->priv->eos = TRUE; - } else { - aggpad->priv->pending_eos = TRUE; - } + g_assert (aggpad->priv->num_buffers == 0); + aggpad->priv->eos = TRUE; PAD_UNLOCK (aggpad); - SRC_BROADCAST (self); SRC_UNLOCK (self); goto eat; @@ -1496,19 +1482,7 @@ gst_aggregator_default_sink_event (GstAggregator * self, goto eat; } case GST_EVENT_TAG: - { - GstTagList *tags; - - gst_event_parse_tag (event, &tags); - - if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) { - gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE); - gst_event_unref (event); - event = NULL; - goto eat; - } - break; - } + goto eat; default: { break; @@ -1526,11 +1500,13 @@ eat: return res; } -static inline gboolean -gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad, - gpointer unused_udata) +static gboolean +gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data) { - gst_aggregator_pad_flush (pad, self); + GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); + GstAggregator *agg = GST_AGGREGATOR_CAST (self); + + gst_aggregator_pad_flush (pad, agg); PAD_LOCK (pad); pad->priv->flow_return = GST_FLOW_FLUSHING; @@ -1549,7 +1525,9 @@ gst_aggregator_stop (GstAggregator * agg) gst_aggregator_reset_flow_values (agg); - gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL); + /* Application needs to make sure no pads are added while it shuts us down */ + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg), + gst_aggregator_stop_pad, NULL); klass = GST_AGGREGATOR_GET_CLASS (agg); @@ -1560,7 +1538,7 @@ gst_aggregator_stop (GstAggregator * agg) agg->priv->has_peer_latency = FALSE; agg->priv->peer_latency_live = FALSE; - agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE; + agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0; if (agg->priv->tags) gst_tag_list_unref (agg->priv->tags); @@ -1644,6 +1622,9 @@ gst_aggregator_default_create_new_pad (GstAggregator * self, GstAggregatorPrivate *priv = self->priv; gint serial = 0; gchar *name = NULL; + GType pad_type = + GST_PAD_TEMPLATE_GTYPE (templ) == + G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ); if (templ->direction != GST_PAD_SINK) goto not_sink; @@ -1664,7 +1645,7 @@ gst_aggregator_default_create_new_pad (GstAggregator * self, } name = g_strdup_printf ("sink_%u", serial); - agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type, + agg_pad = g_object_new (pad_type, "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); g_free (name); @@ -1730,8 +1711,6 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) gst_query_parse_latency (query, &live, &min, &max); - our_latency = self->priv->latency; - if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) { GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min)); @@ -1746,6 +1725,8 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) return FALSE; } + our_latency = self->priv->latency; + self->priv->peer_latency_live = live; self->priv->peer_latency_min = min; self->priv->peer_latency_max = max; @@ -2182,7 +2163,7 @@ gst_aggregator_finalize (GObject * object) * as unresponsive. */ static void -gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) +gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency) { gboolean changed; @@ -2233,12 +2214,12 @@ gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) * before a pad is deemed unresponsive. A value of -1 means an * unlimited time. */ -static gint64 +static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg) { - gint64 res; + GstClockTime res; - g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1); + g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE); GST_OBJECT_LOCK (agg); res = agg->priv->latency; @@ -2255,7 +2236,7 @@ gst_aggregator_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_LATENCY: - gst_aggregator_set_latency_property (agg, g_value_get_int64 (value)); + gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value)); break; case PROP_START_TIME_SELECTION: agg->priv->start_time_selection = g_value_get_enum (value); @@ -2277,7 +2258,7 @@ gst_aggregator_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_LATENCY: - g_value_set_int64 (value, gst_aggregator_get_latency_property (agg)); + g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg)); break; case PROP_START_TIME_SELECTION: g_value_set_enum (value, agg->priv->start_time_selection); @@ -2304,7 +2285,7 @@ gst_aggregator_class_init (GstAggregatorClass * klass) GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator", GST_DEBUG_FG_MAGENTA, "GstAggregator"); - klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD; + klass->finish_buffer = gst_aggregator_default_finish_buffer; klass->sink_event = gst_aggregator_default_sink_event; klass->sink_query = gst_aggregator_default_sink_query; @@ -2330,11 +2311,10 @@ gst_aggregator_class_init (GstAggregatorClass * klass) gobject_class->finalize = gst_aggregator_finalize; g_object_class_install_property (gobject_class, PROP_LATENCY, - g_param_spec_int64 ("latency", "Buffer latency", + g_param_spec_uint64 ("latency", "Buffer latency", "Additional latency in live mode to allow upstream " "to take longer to produce buffers for the current " - "position (in nanoseconds)", 0, - (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1), + "position (in nanoseconds)", 0, G_MAXUINT64, DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION, @@ -2349,9 +2329,6 @@ gst_aggregator_class_init (GstAggregatorClass * klass) "Start time to use if start-time-selection=set", 0, G_MAXUINT64, DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad); - GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries); } static void @@ -2479,6 +2456,12 @@ apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) update_time_level (aggpad, head); } +/* + * Can be called either from the sinkpad's chain function or from the srcpad's + * thread in the case of a buffer synthetized from a GAP event. + * Because of this second case, FLUSH_LOCK can't be used here. + */ + static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) @@ -2486,18 +2469,11 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, GstFlowReturn flow_return; GstClockTime buf_pts; - GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); - - PAD_FLUSH_LOCK (aggpad); - PAD_LOCK (aggpad); flow_return = aggpad->priv->flow_return; if (flow_return != GST_FLOW_OK) goto flushing; - if (aggpad->priv->pending_eos == TRUE) - goto eos; - PAD_UNLOCK (aggpad); buf_pts = GST_BUFFER_PTS (buffer); @@ -2512,7 +2488,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, aggpad->priv->first_buffer = FALSE; } - if (gst_aggregator_pad_has_space (self, aggpad) + if ((gst_aggregator_pad_has_space (self, aggpad) || !head) && aggpad->priv->flow_return == GST_FLOW_OK) { if (head) g_queue_push_head (&aggpad->priv->data, buffer); @@ -2588,15 +2564,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, GST_OBJECT_UNLOCK (self); SRC_UNLOCK (self); - PAD_FLUSH_UNLOCK (aggpad); - GST_DEBUG_OBJECT (aggpad, "Done chaining"); return flow_return; flushing: PAD_UNLOCK (aggpad); - PAD_FLUSH_UNLOCK (aggpad); GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer", gst_flow_get_name (flow_return)); @@ -2604,22 +2577,22 @@ flushing: gst_buffer_unref (buffer); return flow_return; - -eos: - PAD_UNLOCK (aggpad); - PAD_FLUSH_UNLOCK (aggpad); - - gst_buffer_unref (buffer); - GST_DEBUG_OBJECT (aggpad, "We are EOS already..."); - - return GST_FLOW_EOS; } static GstFlowReturn gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) { - return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), - GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE); + GstFlowReturn ret; + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); + + PAD_FLUSH_LOCK (aggpad); + + ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), + aggpad, buffer, TRUE); + + PAD_FLUSH_UNLOCK (aggpad); + + return ret; } static gboolean @@ -2628,7 +2601,6 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, { GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); if (GST_QUERY_IS_SERIALIZED (query)) { GstStructure *s; @@ -2664,9 +2636,11 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, PAD_UNLOCK (aggpad); return ret; - } + } else { + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - return klass->sink_query (self, aggpad, query); + return klass->sink_query (self, aggpad, query); + } flushing: GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", @@ -2676,7 +2650,7 @@ flushing: return FALSE; } -/* Queue serialized events and let the others go though directly. +/* Queue serialized events and let the others go through directly. * The queued events with be handled from the src-pad task in * gst_aggregator_do_events_and_queries(). */ @@ -2687,18 +2661,14 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GstFlowReturn ret = GST_FLOW_OK; GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); if (GST_EVENT_IS_SERIALIZED (event) - && GST_EVENT_TYPE (event) != GST_EVENT_EOS) { + && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { SRC_LOCK (self); PAD_LOCK (aggpad); - if (aggpad->priv->flow_return != GST_FLOW_OK - && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { - ret = aggpad->priv->flow_return; + if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; - } if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { GST_OBJECT_LOCK (aggpad); @@ -2708,18 +2678,14 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GST_OBJECT_UNLOCK (aggpad); } - if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { - GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, - event); - g_queue_push_head (&aggpad->priv->data, event); - event = NULL; - SRC_BROADCAST (self); - } + GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event); + g_queue_push_head (&aggpad->priv->data, event); + SRC_BROADCAST (self); PAD_UNLOCK (aggpad); SRC_UNLOCK (self); - } + } else { + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - if (event) { if (!klass->sink_event (self, aggpad, event)) { /* Copied from GstPad to convert boolean to a GstFlowReturn in * the event handling func */ @@ -2738,7 +2704,7 @@ flushing: gst_pad_store_sticky_event (pad, event); gst_event_unref (event); - return ret; + return aggpad->priv->flow_return; } static gboolean @@ -2840,10 +2806,6 @@ gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad) { pad->priv->num_buffers--; GST_TRACE_OBJECT (pad, "Consuming buffer"); - if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) { - pad->priv->pending_eos = FALSE; - pad->priv->eos = TRUE; - } PAD_BROADCAST_EVENT (pad); } @@ -2893,7 +2855,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) } /** - * gst_aggregator_pad_steal_buffer: + * gst_aggregator_pad_pop_buffer: * @pad: the pad to get buffer from * * Steal the ref to the buffer currently queued in @pad. @@ -2902,7 +2864,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) * queued. You should unref the buffer after usage. */ GstBuffer * -gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) +gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad) { GstBuffer *buffer; @@ -2936,7 +2898,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) { GstBuffer *buf; - buf = gst_aggregator_pad_steal_buffer (pad); + buf = gst_aggregator_pad_pop_buffer (pad); if (buf == NULL) return FALSE; @@ -2946,7 +2908,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) } /** - * gst_aggregator_pad_get_buffer: + * gst_aggregator_pad_peek_buffer: * @pad: the pad to get buffer from * * Returns: (transfer full): A reference to the buffer in @pad or @@ -2954,7 +2916,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) * usage. */ GstBuffer * -gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) +gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad) { GstBuffer *buffer; @@ -2972,6 +2934,12 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) return buffer; } +/** + * gst_aggregator_pad_is_eos: + * @pad: an aggregator pad + * + * Returns: %TRUE if the pad is EOS, otherwise %FALSE. + */ gboolean gst_aggregator_pad_is_eos (GstAggregatorPad * pad) { @@ -2984,7 +2952,8 @@ gst_aggregator_pad_is_eos (GstAggregatorPad * pad) return is_eos; } -/** +#if 0 +/* * gst_aggregator_merge_tags: * @self: a #GstAggregator * @tags: a #GstTagList to merge @@ -3018,6 +2987,7 @@ gst_aggregator_merge_tags (GstAggregator * self, self->priv->tags_changed = TRUE; GST_OBJECT_UNLOCK (self); } +#endif /** * gst_aggregator_set_latency: