X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstaggregator.c;h=9d6bb7a50db8f441fe0093a89bcd119d0c012fc9;hb=117200faebd922c6b994fac96f0bf6ba1d9a1881;hp=28bdde00b428a75f4065b23e03fda052482510d7;hpb=6eda156b2041175dc3e88ea6cfb842b9ce5089b1;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 28bdde0..9d6bb7a 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -22,8 +22,8 @@ /** * SECTION: gstaggregator * @title: GstAggregator - * @short_description: manages a set of pads with the purpose of - * aggregating their buffers. + * @short_description: Base class for mixers and muxers, manages a set of input + * pads and aggregates their streams * @see_also: gstcollectpads for historical reasons. * * Manages a set of pads with the purpose of aggregating their buffers. @@ -32,12 +32,18 @@ * * Base class for mixers and muxers. Subclasses should at least implement * the #GstAggregatorClass.aggregate() virtual method. * - * * When data is queued on all pads, tha aggregate vmethod is called. + * * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a + * #GstPadQueryFunction to queue all serialized data packets per sink pad. + * Subclasses should not overwrite those, but instead implement + * #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as + * needed. + * + * * When data is queued on all pads, the aggregate vmethod is called. * * * One can peek at the data on any given GstAggregatorPad with the - * gst_aggregator_pad_get_buffer () method, and take ownership of it - * with the gst_aggregator_pad_steal_buffer () method. When a buffer - * has been taken with steal_buffer (), a new buffer can be queued + * gst_aggregator_pad_peek_buffer () method, and remove it from the pad + * with the gst_aggregator_pad_pop_buffer () method. When a buffer + * has been taken with pop_buffer (), a new buffer can be queued * on that pad. * * * If the subclass wishes to push a buffer downstream in its aggregate @@ -55,6 +61,22 @@ * flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE * to ease their identification and subsequent processing. * + * This class used to live in gst-plugins-bad and was moved to core. + * + * Since: 1.14 + */ + +/** + * SECTION: gstaggregatorpad + * @title: GstAggregatorPad + * @short_description: #GstPad subclass for pads managed by #GstAggregator + * @see_also: gstcollectpads for historical reasons. + * + * Pads managed by a #GstAggregor subclass. + * + * This class used to live in gst-plugins-bad and was moved to core. + * + * Since: 1.14 */ #ifdef HAVE_CONFIG_H @@ -94,12 +116,20 @@ gst_aggregator_start_time_selection_get_type (void) } /* Might become API */ +#if 0 static void gst_aggregator_merge_tags (GstAggregator * aggregator, const GstTagList * tags, GstTagMergeMode mode); +#endif static void gst_aggregator_set_latency_property (GstAggregator * agg, - gint64 latency); -static gint64 gst_aggregator_get_latency_property (GstAggregator * agg); + GstClockTime latency); +static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg); + +static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); +static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad); + +GST_DEBUG_CATEGORY_STATIC (aggregator_debug); +#define GST_CAT_DEFAULT aggregator_debug /* Locking order, locks in this element must always be taken in this order * @@ -113,12 +143,6 @@ static gint64 gst_aggregator_get_latency_property (GstAggregator * agg); * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad) */ - -static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); - -GST_DEBUG_CATEGORY_STATIC (aggregator_debug); -#define GST_CAT_DEFAULT aggregator_debug - /* GstAggregatorPad definitions */ #define PAD_LOCK(pad) G_STMT_START { \ GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p", \ @@ -207,18 +231,20 @@ struct _GstAggregatorPadPrivate GstFlowReturn flow_return; gboolean pending_flush_start; gboolean pending_flush_stop; - gboolean pending_eos; gboolean first_buffer; - GQueue buffers; + GQueue data; /* buffers, events and queries */ GstBuffer *clipped_buffer; guint num_buffers; + + /* used to track fill state of queues, only used with live-src and when + * latency property is set to > 0 */ GstClockTime head_position; GstClockTime tail_position; - GstClockTime head_time; + GstClockTime head_time; /* running time */ GstClockTime tail_time; - GstClockTime time_level; + GstClockTime time_level; /* how much head is ahead of tail */ GstSegment head_segment; /* segment before the queue */ gboolean negotiated; @@ -237,7 +263,6 @@ struct _GstAggregatorPadPrivate static void gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad) { - aggpad->priv->pending_eos = FALSE; aggpad->priv->eos = FALSE; aggpad->priv->flow_return = GST_FLOW_OK; GST_OBJECT_LOCK (aggpad); @@ -281,6 +306,8 @@ struct _GstAggregatorPrivate /* Our state is >= PAUSED */ gboolean running; /* protected by src_lock */ + /* seqnum from seek or segment, + * to be applied to synthetic segment/eos events */ gint seqnum; gboolean send_stream_start; /* protected by srcpad stream lock */ gboolean send_segment; @@ -320,13 +347,16 @@ struct _GstAggregatorPrivate gint64 latency; /* protected by both src_lock and all pad locks */ }; +/* Seek event forwarding helper */ typedef struct { + /* parameters */ GstEvent *event; - gboolean result; gboolean flush; gboolean only_to_active_pads; + /* results */ + gboolean result; gboolean one_actually_seeked; } EventData; @@ -346,99 +376,20 @@ enum static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head); -/** - * gst_aggregator_iterate_sinkpads: - * @self: The #GstAggregator - * @func: (scope call): The function to call. - * @user_data: (closure): The data to pass to @func. - * - * Iterate the sinkpads of aggregator to call a function on them. - * - * This method guarantees that @func will be called only once for each - * sink pad. - */ -gboolean -gst_aggregator_iterate_sinkpads (GstAggregator * self, - GstAggregatorPadForeachFunc func, gpointer user_data) -{ - gboolean result = FALSE; - GstIterator *iter; - gboolean done = FALSE; - GValue item = { 0, }; - GList *seen_pads = NULL; - - iter = gst_element_iterate_sink_pads (GST_ELEMENT (self)); - - if (!iter) - goto no_iter; - - while (!done) { - switch (gst_iterator_next (iter, &item)) { - case GST_ITERATOR_OK: - { - GstAggregatorPad *pad; - - pad = g_value_get_object (&item); - - /* if already pushed, skip. FIXME, find something faster to tag pads */ - if (pad == NULL || g_list_find (seen_pads, pad)) { - g_value_reset (&item); - break; - } - - GST_LOG_OBJECT (pad, "calling function %s on pad", - GST_DEBUG_FUNCPTR_NAME (func)); - - result = func (self, pad, user_data); - - done = !result; - - seen_pads = g_list_prepend (seen_pads, pad); - - g_value_reset (&item); - break; - } - case GST_ITERATOR_RESYNC: - gst_iterator_resync (iter); - break; - case GST_ITERATOR_ERROR: - GST_ERROR_OBJECT (self, - "Could not iterate over internally linked pads"); - done = TRUE; - break; - case GST_ITERATOR_DONE: - done = TRUE; - break; - } - } - g_value_unset (&item); - gst_iterator_free (iter); - - if (seen_pads == NULL) { - GST_DEBUG_OBJECT (self, "No pad seen"); - return FALSE; - } - - g_list_free (seen_pads); - -no_iter: - return result; -} - static gboolean gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) { - return (g_queue_peek_tail (&pad->priv->buffers) == NULL && + return (g_queue_peek_tail (&pad->priv->data) == NULL && pad->priv->clipped_buffer == NULL); } static gboolean gst_aggregator_check_pads_ready (GstAggregator * self) { - GstAggregatorPad *pad; + GstAggregatorPad *pad = NULL; GList *l, *sinkpads; gboolean have_buffer = TRUE; - gboolean have_event = FALSE; + gboolean have_event_or_query = FALSE; GST_LOG_OBJECT (self, "checking pads"); @@ -455,7 +406,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self) if (pad->priv->num_buffers == 0) { if (!gst_aggregator_pad_queue_is_empty (pad)) - have_event = TRUE; + have_event_or_query = TRUE; if (!pad->priv->eos) { have_buffer = FALSE; @@ -476,7 +427,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self) PAD_UNLOCK (pad); } - if (!have_buffer && !have_event) + if (!have_buffer && !have_event_or_query) goto pad_not_ready; if (have_buffer) @@ -494,13 +445,13 @@ no_sinkpads: } pad_not_ready: { - if (have_event) + if (have_event_or_query) GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet," " but waking up for serialized event"); else GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet"); GST_OBJECT_UNLOCK (self); - return have_event; + return have_event_or_query; } } @@ -551,6 +502,8 @@ gst_aggregator_push_mandatory_events (GstAggregator * self) segment = gst_event_new_segment (&self->segment); if (!self->priv->seqnum) + /* This code-path is in preparation to be able to run without a source + * connected. Then we won't have a seq-num from a segment event. */ self->priv->seqnum = gst_event_get_seqnum (segment); else gst_event_set_seqnum (segment, self->priv->seqnum); @@ -588,17 +541,8 @@ gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps) GST_PAD_STREAM_UNLOCK (self->srcpad); } -/** - * gst_aggregator_finish_buffer: - * @self: The #GstAggregator - * @buffer: (transfer full): the #GstBuffer to push. - * - * This method will push the provided output buffer downstream. If needed, - * mandatory events such as stream-start, caps, and segment events will be - * sent before pushing the buffer. - */ -GstFlowReturn -gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer) +static GstFlowReturn +gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer) { gst_aggregator_push_mandatory_events (self); @@ -616,6 +560,25 @@ gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer) } } +/** + * gst_aggregator_finish_buffer: + * @aggregator: The #GstAggregator + * @buffer: (transfer full): the #GstBuffer to push. + * + * This method will push the provided output buffer downstream. If needed, + * mandatory events such as stream-start, caps, and segment events will be + * sent before pushing the buffer. + */ +GstFlowReturn +gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer) +{ + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator); + + g_assert (klass->finish_buffer != NULL); + + return klass->finish_buffer (aggregator, buffer); +} + static void gst_aggregator_push_eos (GstAggregator * self) { @@ -748,26 +711,30 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) } static gboolean -check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) +gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, + gpointer user_data) { + GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); + GstAggregator *aggregator = GST_AGGREGATOR_CAST (self); GstEvent *event = NULL; + GstQuery *query = NULL; GstAggregatorClass *klass = NULL; gboolean *processed_event = user_data; do { event = NULL; + query = NULL; PAD_LOCK (pad); - if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) { - pad->priv->pending_eos = FALSE; - pad->priv->eos = TRUE; - } if (pad->priv->clipped_buffer == NULL && - GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) { - event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers)); + !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { + if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))) + event = gst_event_ref (g_queue_peek_tail (&pad->priv->data)); + if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data))) + query = g_queue_peek_tail (&pad->priv->data); } PAD_UNLOCK (pad); - if (event) { + if (event || query) { gboolean ret; if (processed_event) @@ -775,20 +742,72 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) if (klass == NULL) klass = GST_AGGREGATOR_GET_CLASS (self); - GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); - gst_event_ref (event); - ret = klass->sink_event (self, pad, event); + if (event) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); + gst_event_ref (event); + ret = klass->sink_event (aggregator, pad, event); + + PAD_LOCK (pad); + if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) + pad->priv->negotiated = ret; + if (g_queue_peek_tail (&pad->priv->data) == event) + gst_event_unref (g_queue_pop_tail (&pad->priv->data)); + gst_event_unref (event); + } else if (query) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query); + ret = klass->sink_query (aggregator, pad, query); + + PAD_LOCK (pad); + if (g_queue_peek_tail (&pad->priv->data) == query) { + GstStructure *s; + + s = gst_query_writable_structure (query); + gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret, + NULL); + g_queue_pop_tail (&pad->priv->data); + } + } - PAD_LOCK (pad); - if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) - pad->priv->negotiated = ret; - if (g_queue_peek_tail (&pad->priv->buffers) == event) - gst_event_unref (g_queue_pop_tail (&pad->priv->buffers)); - gst_event_unref (event); PAD_BROADCAST_EVENT (pad); PAD_UNLOCK (pad); } - } while (event != NULL); + } while (event || query); + + return TRUE; +} + +static gboolean +gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad, + gpointer user_data) +{ + GList *item; + GstAggregatorPad *aggpad = (GstAggregatorPad *) epad; + GstAggregator *agg = (GstAggregator *) self; + GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad); + + if (!klass->skip_buffer) + return FALSE; + + PAD_LOCK (aggpad); + + item = g_queue_peek_head_link (&aggpad->priv->data); + while (item) { + GList *next = item->next; + + if (GST_IS_BUFFER (item->data) + && klass->skip_buffer (aggpad, agg, item->data)) { + GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data); + gst_aggregator_pad_buffer_consumed (aggpad); + gst_buffer_unref (item->data); + g_queue_delete_link (&aggpad->priv->data, item); + } else { + break; + } + + item = next; + } + + PAD_UNLOCK (aggpad); return TRUE; } @@ -805,7 +824,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, else aggpad->priv->flow_return = flow_return; - item = g_queue_peek_head_link (&aggpad->priv->buffers); + item = g_queue_peek_head_link (&aggpad->priv->data); while (item) { GList *next = item->next; @@ -816,8 +835,9 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, GST_EVENT_TYPE (item->data) == GST_EVENT_EOS || GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT || !GST_EVENT_IS_STICKY (item->data)) { - gst_mini_object_unref (item->data); - g_queue_delete_link (&aggpad->priv->buffers, item); + if (!GST_IS_QUERY (item->data)) + gst_mini_object_unref (item->data); + g_queue_delete_link (&aggpad->priv->data, item); } item = next; } @@ -1071,12 +1091,21 @@ gst_aggregator_aggregate_func (GstAggregator * self) GstFlowReturn flow_return = GST_FLOW_OK; gboolean processed_event = FALSE; - gst_aggregator_iterate_sinkpads (self, check_events, NULL); + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_do_events_and_queries, NULL); + if (self->priv->peer_latency_live) + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_pad_skip_buffers, NULL); + + /* Ensure we have buffers ready (either in clipped_buffer or at the head of + * the queue */ if (!gst_aggregator_wait_and_check (self, &timeout)) continue; - gst_aggregator_iterate_sinkpads (self, check_events, &processed_event); + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_do_events_and_queries, &processed_event); + if (processed_event) continue; @@ -1289,38 +1318,37 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, static void update_time_level (GstAggregatorPad * aggpad, gboolean head) { + GstAggregatorPadPrivate *priv = aggpad->priv; + if (head) { - if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) && - aggpad->priv->head_segment.format == GST_FORMAT_TIME) - aggpad->priv->head_time = - gst_segment_to_running_time (&aggpad->priv->head_segment, - GST_FORMAT_TIME, aggpad->priv->head_position); + if (GST_CLOCK_TIME_IS_VALID (priv->head_position) && + priv->head_segment.format == GST_FORMAT_TIME) + priv->head_time = gst_segment_to_running_time (&priv->head_segment, + GST_FORMAT_TIME, priv->head_position); else - aggpad->priv->head_time = GST_CLOCK_TIME_NONE; + priv->head_time = GST_CLOCK_TIME_NONE; - if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time)) - aggpad->priv->tail_time = aggpad->priv->head_time; + if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time)) + priv->tail_time = priv->head_time; } else { - if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) && + if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) && aggpad->segment.format == GST_FORMAT_TIME) - aggpad->priv->tail_time = - gst_segment_to_running_time (&aggpad->segment, - GST_FORMAT_TIME, aggpad->priv->tail_position); + priv->tail_time = gst_segment_to_running_time (&aggpad->segment, + GST_FORMAT_TIME, priv->tail_position); else - aggpad->priv->tail_time = aggpad->priv->head_time; + priv->tail_time = priv->head_time; } - if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE || - aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) { - aggpad->priv->time_level = 0; + if (priv->head_time == GST_CLOCK_TIME_NONE || + priv->tail_time == GST_CLOCK_TIME_NONE) { + priv->time_level = 0; return; } - if (aggpad->priv->tail_time > aggpad->priv->head_time) - aggpad->priv->time_level = 0; + if (priv->tail_time > priv->head_time) + priv->time_level = 0; else - aggpad->priv->time_level = aggpad->priv->head_time - - aggpad->priv->tail_time; + priv->time_level = priv->head_time - priv->tail_time; } @@ -1333,6 +1361,8 @@ gst_aggregator_default_sink_event (GstAggregator * self, GstPad *pad = GST_PAD (aggpad); GstAggregatorPrivate *priv = self->priv; + GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event); + switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: { @@ -1343,8 +1373,6 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_FLUSH_STOP: { - GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP"); - gst_aggregator_pad_flush (aggpad, self); GST_OBJECT_LOCK (self); if (priv->flush_seeking) { @@ -1376,21 +1404,11 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_EOS: { - GST_DEBUG_OBJECT (aggpad, "EOS"); - - /* We still have a buffer, and we don't want the subclass to have to - * check for it. Mark pending_eos, eos will be set when steal_buffer is - * called - */ SRC_LOCK (self); PAD_LOCK (aggpad); - if (aggpad->priv->num_buffers == 0) { - aggpad->priv->eos = TRUE; - } else { - aggpad->priv->pending_eos = TRUE; - } + g_assert (aggpad->priv->num_buffers == 0); + aggpad->priv->eos = TRUE; PAD_UNLOCK (aggpad); - SRC_BROADCAST (self); SRC_UNLOCK (self); goto eat; @@ -1400,6 +1418,10 @@ gst_aggregator_default_sink_event (GstAggregator * self, PAD_LOCK (aggpad); GST_OBJECT_LOCK (aggpad); gst_event_copy_segment (event, &aggpad->segment); + /* We've got a new segment, tail_position is now meaningless + * and may interfere with the time_level calculation + */ + aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; update_time_level (aggpad, FALSE); GST_OBJECT_UNLOCK (aggpad); PAD_UNLOCK (aggpad); @@ -1447,6 +1469,10 @@ gst_aggregator_default_sink_event (GstAggregator * self, GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP); GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE); + /* Remove GAP event so we can replace it with the buffer */ + if (g_queue_peek_tail (&aggpad->priv->data) == event) + gst_event_unref (g_queue_pop_tail (&aggpad->priv->data)); + if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) != GST_FLOW_OK) { GST_WARNING_OBJECT (self, "Failed to chain gap buffer"); @@ -1456,19 +1482,7 @@ gst_aggregator_default_sink_event (GstAggregator * self, goto eat; } case GST_EVENT_TAG: - { - GstTagList *tags; - - gst_event_parse_tag (event, &tags); - - if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) { - gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE); - gst_event_unref (event); - event = NULL; - goto eat; - } - break; - } + goto eat; default: { break; @@ -1486,14 +1500,18 @@ eat: return res; } -static inline gboolean -gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad, - gpointer unused_udata) +static gboolean +gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data) { - gst_aggregator_pad_flush (pad, self); + GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); + GstAggregator *agg = GST_AGGREGATOR_CAST (self); + + gst_aggregator_pad_flush (pad, agg); PAD_LOCK (pad); + pad->priv->flow_return = GST_FLOW_FLUSHING; pad->priv->negotiated = FALSE; + PAD_BROADCAST_EVENT (pad); PAD_UNLOCK (pad); return TRUE; @@ -1507,7 +1525,9 @@ gst_aggregator_stop (GstAggregator * agg) gst_aggregator_reset_flow_values (agg); - gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL); + /* Application needs to make sure no pads are added while it shuts us down */ + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg), + gst_aggregator_stop_pad, NULL); klass = GST_AGGREGATOR_GET_CLASS (agg); @@ -1518,7 +1538,7 @@ gst_aggregator_stop (GstAggregator * agg) agg->priv->has_peer_latency = FALSE; agg->priv->peer_latency_live = FALSE; - agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE; + agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0; if (agg->priv->tags) gst_tag_list_unref (agg->priv->tags); @@ -1602,10 +1622,16 @@ gst_aggregator_default_create_new_pad (GstAggregator * self, GstAggregatorPrivate *priv = self->priv; gint serial = 0; gchar *name = NULL; + GType pad_type = + GST_PAD_TEMPLATE_GTYPE (templ) == + G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ); if (templ->direction != GST_PAD_SINK) goto not_sink; + if (templ->presence != GST_PAD_REQUEST) + goto not_request; + GST_OBJECT_LOCK (self); if (req_name == NULL || strlen (req_name) < 6 || !g_str_has_prefix (req_name, "sink_")) { @@ -1619,7 +1645,7 @@ gst_aggregator_default_create_new_pad (GstAggregator * self, } name = g_strdup_printf ("sink_%u", serial); - agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type, + agg_pad = g_object_new (pad_type, "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); g_free (name); @@ -1630,7 +1656,12 @@ gst_aggregator_default_create_new_pad (GstAggregator * self, /* errors */ not_sink: { - GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad\n"); + GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad"); + return NULL; + } +not_request: + { + GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad"); return NULL; } } @@ -1680,8 +1711,6 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) gst_query_parse_latency (query, &live, &min, &max); - our_latency = self->priv->latency; - if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) { GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min)); @@ -1696,6 +1725,8 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) return FALSE; } + our_latency = self->priv->latency; + self->priv->peer_latency_live = live; self->priv->peer_latency_min = min; self->priv->peer_latency_max = max; @@ -1901,22 +1932,17 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) return FALSE; } -static EventData +static void gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, - GstEvent * event, gboolean flush, gboolean only_to_active_pads) + EventData * evdata) { - EventData evdata; - - evdata.event = event; - evdata.result = TRUE; - evdata.flush = flush; - evdata.one_actually_seeked = FALSE; - evdata.only_to_active_pads = only_to_active_pads; + evdata->result = TRUE; + evdata->one_actually_seeked = FALSE; /* We first need to set all pads as flushing in a first pass * as flush_start flush_stop is sometimes sent synchronously * while we send the seek event */ - if (flush) { + if (evdata->flush) { GList *l; GST_OBJECT_LOCK (self); @@ -1931,11 +1957,9 @@ gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, GST_OBJECT_UNLOCK (self); } - gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, &evdata); + gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata); - gst_event_unref (event); - - return evdata; + gst_event_unref (evdata->event); } static gboolean @@ -1947,7 +1971,7 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) GstSeekType start_type, stop_type; gint64 start, stop; gboolean flush; - EventData evdata; + EventData evdata = { 0, }; GstAggregatorPrivate *priv = self->priv; gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type, @@ -1971,8 +1995,10 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) GST_OBJECT_UNLOCK (self); /* forward the seek upstream */ - evdata = - gst_aggregator_forward_event_to_all_sinkpads (self, event, flush, FALSE); + evdata.event = event; + evdata.flush = flush; + evdata.only_to_active_pads = FALSE; + gst_aggregator_forward_event_to_all_sinkpads (self, &evdata); event = NULL; if (!evdata.result || !evdata.one_actually_seeked) { @@ -1990,41 +2016,28 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) static gboolean gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event) { - EventData evdata; - gboolean res = TRUE; + EventData evdata = { 0, }; switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEEK: - { - gst_event_ref (event); - res = gst_aggregator_do_seek (self, event); - gst_event_unref (event); - event = NULL; - goto done; - } + /* _do_seek() unrefs the event. */ + return gst_aggregator_do_seek (self, event); case GST_EVENT_NAVIGATION: - { /* navigation is rather pointless. */ - res = FALSE; gst_event_unref (event); - goto done; - } + return FALSE; default: - { break; - } } /* Don't forward QOS events to pads that had no active buffer yet. Otherwise * they will receive a QOS event that has earliest_time=0 (because we can't * have negative timestamps), and consider their buffer as too late */ - evdata = - gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE, - GST_EVENT_TYPE (event) == GST_EVENT_QOS); - res = evdata.result; - -done: - return res; + evdata.event = event; + evdata.flush = FALSE; + evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS; + gst_aggregator_forward_event_to_all_sinkpads (self, &evdata); + return evdata.result; } static gboolean @@ -2150,7 +2163,7 @@ gst_aggregator_finalize (GObject * object) * as unresponsive. */ static void -gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) +gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency) { gboolean changed; @@ -2201,12 +2214,12 @@ gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) * before a pad is deemed unresponsive. A value of -1 means an * unlimited time. */ -static gint64 +static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg) { - gint64 res; + GstClockTime res; - g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1); + g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE); GST_OBJECT_LOCK (agg); res = agg->priv->latency; @@ -2223,7 +2236,7 @@ gst_aggregator_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_LATENCY: - gst_aggregator_set_latency_property (agg, g_value_get_int64 (value)); + gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value)); break; case PROP_START_TIME_SELECTION: agg->priv->start_time_selection = g_value_get_enum (value); @@ -2245,7 +2258,7 @@ gst_aggregator_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_LATENCY: - g_value_set_int64 (value, gst_aggregator_get_latency_property (agg)); + g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg)); break; case PROP_START_TIME_SELECTION: g_value_set_enum (value, agg->priv->start_time_selection); @@ -2272,7 +2285,7 @@ gst_aggregator_class_init (GstAggregatorClass * klass) GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator", GST_DEBUG_FG_MAGENTA, "GstAggregator"); - klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD; + klass->finish_buffer = gst_aggregator_default_finish_buffer; klass->sink_event = gst_aggregator_default_sink_event; klass->sink_query = gst_aggregator_default_sink_query; @@ -2298,11 +2311,10 @@ gst_aggregator_class_init (GstAggregatorClass * klass) gobject_class->finalize = gst_aggregator_finalize; g_object_class_install_property (gobject_class, PROP_LATENCY, - g_param_spec_int64 ("latency", "Buffer latency", + g_param_spec_uint64 ("latency", "Buffer latency", "Additional latency in live mode to allow upstream " "to take longer to produce buffers for the current " - "position (in nanoseconds)", 0, - (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1), + "position (in nanoseconds)", 0, G_MAXUINT64, DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION, @@ -2317,8 +2329,6 @@ gst_aggregator_class_init (GstAggregatorClass * klass) "Start time to use if start-time-selection=set", 0, G_MAXUINT64, DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad); } static void @@ -2446,6 +2456,12 @@ apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) update_time_level (aggpad, head); } +/* + * Can be called either from the sinkpad's chain function or from the srcpad's + * thread in the case of a buffer synthetized from a GAP event. + * Because of this second case, FLUSH_LOCK can't be used here. + */ + static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) @@ -2453,18 +2469,11 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, GstFlowReturn flow_return; GstClockTime buf_pts; - GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); - - PAD_FLUSH_LOCK (aggpad); - PAD_LOCK (aggpad); flow_return = aggpad->priv->flow_return; if (flow_return != GST_FLOW_OK) goto flushing; - if (aggpad->priv->pending_eos == TRUE) - goto eos; - PAD_UNLOCK (aggpad); buf_pts = GST_BUFFER_PTS (buffer); @@ -2479,12 +2488,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, aggpad->priv->first_buffer = FALSE; } - if (gst_aggregator_pad_has_space (self, aggpad) + if ((gst_aggregator_pad_has_space (self, aggpad) || !head) && aggpad->priv->flow_return == GST_FLOW_OK) { if (head) - g_queue_push_head (&aggpad->priv->buffers, buffer); + g_queue_push_head (&aggpad->priv->data, buffer); else - g_queue_push_tail (&aggpad->priv->buffers, buffer); + g_queue_push_tail (&aggpad->priv->data, buffer); apply_buffer (aggpad, buffer, head); aggpad->priv->num_buffers++; buffer = NULL; @@ -2555,15 +2564,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, GST_OBJECT_UNLOCK (self); SRC_UNLOCK (self); - PAD_FLUSH_UNLOCK (aggpad); - GST_DEBUG_OBJECT (aggpad, "Done chaining"); return flow_return; flushing: PAD_UNLOCK (aggpad); - PAD_FLUSH_UNLOCK (aggpad); GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer", gst_flow_get_name (flow_return)); @@ -2571,56 +2577,83 @@ flushing: gst_buffer_unref (buffer); return flow_return; - -eos: - PAD_UNLOCK (aggpad); - PAD_FLUSH_UNLOCK (aggpad); - - gst_buffer_unref (buffer); - GST_DEBUG_OBJECT (aggpad, "We are EOS already..."); - - return GST_FLOW_EOS; } static GstFlowReturn gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) { - return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), - GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE); + GstFlowReturn ret; + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); + + PAD_FLUSH_LOCK (aggpad); + + ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), + aggpad, buffer, TRUE); + + PAD_FLUSH_UNLOCK (aggpad); + + return ret; } static gboolean gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query) { + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); if (GST_QUERY_IS_SERIALIZED (query)) { + GstStructure *s; + gboolean ret = FALSE; + + SRC_LOCK (self); PAD_LOCK (aggpad); + if (aggpad->priv->flow_return != GST_FLOW_OK) { + SRC_UNLOCK (self); + goto flushing; + } + + g_queue_push_head (&aggpad->priv->data, query); + SRC_BROADCAST (self); + SRC_UNLOCK (self); + while (!gst_aggregator_pad_queue_is_empty (aggpad) && aggpad->priv->flow_return == GST_FLOW_OK) { GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); } + s = gst_query_writable_structure (query); + if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret)) + gst_structure_remove_field (s, "gst-aggregator-retval"); + else + g_queue_remove (&aggpad->priv->data, query); + if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; PAD_UNLOCK (aggpad); - } - return klass->sink_query (GST_AGGREGATOR (parent), - GST_AGGREGATOR_PAD (pad), query); + return ret; + } else { + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); + + return klass->sink_query (self, aggpad, query); + } flushing: GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", gst_flow_get_name (aggpad->priv->flow_return)); PAD_UNLOCK (aggpad); + return FALSE; } +/* Queue serialized events and let the others go through directly. + * The queued events with be handled from the src-pad task in + * gst_aggregator_do_events_and_queries(). + */ static GstFlowReturn gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GstEvent * event) @@ -2628,18 +2661,14 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GstFlowReturn ret = GST_FLOW_OK; GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS - /* && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE */ ) { + if (GST_EVENT_IS_SERIALIZED (event) + && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { SRC_LOCK (self); PAD_LOCK (aggpad); - if (aggpad->priv->flow_return != GST_FLOW_OK - && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { - ret = aggpad->priv->flow_return; + if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; - } if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { GST_OBJECT_LOCK (aggpad); @@ -2649,18 +2678,14 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GST_OBJECT_UNLOCK (aggpad); } - if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { - GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, - event); - g_queue_push_head (&aggpad->priv->buffers, event); - event = NULL; - SRC_BROADCAST (self); - } + GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event); + g_queue_push_head (&aggpad->priv->data, event); + SRC_BROADCAST (self); PAD_UNLOCK (aggpad); SRC_UNLOCK (self); - } + } else { + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - if (event) { if (!klass->sink_event (self, aggpad, event)) { /* Copied from GstPad to convert boolean to a GstFlowReturn in * the event handling func */ @@ -2679,7 +2704,7 @@ flushing: gst_pad_store_sticky_event (pad, event); gst_event_unref (event); - return ret; + return aggpad->priv->flow_return; } static gboolean @@ -2765,7 +2790,7 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD, GstAggregatorPadPrivate); - g_queue_init (&pad->priv->buffers); + g_queue_init (&pad->priv->data); g_cond_init (&pad->priv->event_cond); g_mutex_init (&pad->priv->flush_lock); @@ -2781,10 +2806,6 @@ gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad) { pad->priv->num_buffers--; GST_TRACE_OBJECT (pad, "Consuming buffer"); - if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) { - pad->priv->pending_eos = FALSE; - pad->priv->eos = TRUE; - } PAD_BROADCAST_EVENT (pad); } @@ -2793,12 +2814,12 @@ static void gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) { GstAggregator *self = NULL; - GstAggregatorClass *aggclass; + GstAggregatorClass *aggclass = NULL; GstBuffer *buffer = NULL; while (pad->priv->clipped_buffer == NULL && - GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) { - buffer = g_queue_pop_tail (&pad->priv->buffers); + GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { + buffer = g_queue_pop_tail (&pad->priv->data); apply_buffer (pad, buffer, FALSE); @@ -2834,7 +2855,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) } /** - * gst_aggregator_pad_steal_buffer: + * gst_aggregator_pad_pop_buffer: * @pad: the pad to get buffer from * * Steal the ref to the buffer currently queued in @pad. @@ -2843,7 +2864,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) * queued. You should unref the buffer after usage. */ GstBuffer * -gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) +gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad) { GstBuffer *buffer; @@ -2852,9 +2873,9 @@ gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) gst_aggregator_pad_clip_buffer_unlocked (pad); buffer = pad->priv->clipped_buffer; - pad->priv->clipped_buffer = NULL; if (buffer) { + pad->priv->clipped_buffer = NULL; gst_aggregator_pad_buffer_consumed (pad); GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); } @@ -2877,7 +2898,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) { GstBuffer *buf; - buf = gst_aggregator_pad_steal_buffer (pad); + buf = gst_aggregator_pad_pop_buffer (pad); if (buf == NULL) return FALSE; @@ -2887,7 +2908,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) } /** - * gst_aggregator_pad_get_buffer: + * gst_aggregator_pad_peek_buffer: * @pad: the pad to get buffer from * * Returns: (transfer full): A reference to the buffer in @pad or @@ -2895,7 +2916,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) * usage. */ GstBuffer * -gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) +gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad) { GstBuffer *buffer; @@ -2913,6 +2934,12 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) return buffer; } +/** + * gst_aggregator_pad_is_eos: + * @pad: an aggregator pad + * + * Returns: %TRUE if the pad is EOS, otherwise %FALSE. + */ gboolean gst_aggregator_pad_is_eos (GstAggregatorPad * pad) { @@ -2925,7 +2952,8 @@ gst_aggregator_pad_is_eos (GstAggregatorPad * pad) return is_eos; } -/** +#if 0 +/* * gst_aggregator_merge_tags: * @self: a #GstAggregator * @tags: a #GstTagList to merge @@ -2959,6 +2987,7 @@ gst_aggregator_merge_tags (GstAggregator * self, self->priv->tags_changed = TRUE; GST_OBJECT_UNLOCK (self); } +#endif /** * gst_aggregator_set_latency: