X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstaggregator.c;h=94d481604fb8564afb251f0be5e567a6d45d7b8b;hb=5bf13cdd5314bc3c6c81bd620e712acdcab14eb2;hp=315b8b820ec287404670c298496a4f92bbfb176f;hpb=4f81aa8742ffd154dfbc72653e8dc34bb36d6139;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 315b8b8..94d4816 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -22,8 +22,8 @@ /** * SECTION: gstaggregator * @title: GstAggregator - * @short_description: manages a set of pads with the purpose of - * aggregating their buffers. + * @short_description: Base class for mixers and muxers, manages a set of input + * pads and aggregates their streams * @see_also: gstcollectpads for historical reasons. * * Manages a set of pads with the purpose of aggregating their buffers. @@ -32,12 +32,18 @@ * * Base class for mixers and muxers. Subclasses should at least implement * the #GstAggregatorClass.aggregate() virtual method. * - * * When data is queued on all pads, tha aggregate vmethod is called. + * * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a + * #GstPadQueryFunction to queue all serialized data packets per sink pad. + * Subclasses should not overwrite those, but instead implement + * #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as + * needed. + * + * * When data is queued on all pads, the aggregate vmethod is called. * * * One can peek at the data on any given GstAggregatorPad with the - * gst_aggregator_pad_get_buffer () method, and take ownership of it - * with the gst_aggregator_pad_steal_buffer () method. When a buffer - * has been taken with steal_buffer (), a new buffer can be queued + * gst_aggregator_pad_peek_buffer () method, and remove it from the pad + * with the gst_aggregator_pad_pop_buffer () method. When a buffer + * has been taken with pop_buffer (), a new buffer can be queued * on that pad. * * * If the subclass wishes to push a buffer downstream in its aggregate @@ -55,6 +61,26 @@ * flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE * to ease their identification and subsequent processing. * + * * Subclasses must use (a subclass of) #GstAggregatorPad for both their + * sink and source pads. + * See gst_element_class_add_static_pad_template_with_gtype(). + * + * This class used to live in gst-plugins-bad and was moved to core. + * + * Since: 1.14 + */ + +/** + * SECTION: gstaggregatorpad + * @title: GstAggregatorPad + * @short_description: #GstPad subclass for pads managed by #GstAggregator + * @see_also: gstcollectpads for historical reasons. + * + * Pads managed by a #GstAggregor subclass. + * + * This class used to live in gst-plugins-bad and was moved to core. + * + * Since: 1.14 */ #ifdef HAVE_CONFIG_H @@ -94,12 +120,21 @@ gst_aggregator_start_time_selection_get_type (void) } /* Might become API */ +#if 0 static void gst_aggregator_merge_tags (GstAggregator * aggregator, const GstTagList * tags, GstTagMergeMode mode); +#endif static void gst_aggregator_set_latency_property (GstAggregator * agg, - gint64 latency); -static gint64 gst_aggregator_get_latency_property (GstAggregator * agg); + GstClockTime latency); +static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg); +static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); + +static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, + GstBuffer * buffer); + +GST_DEBUG_CATEGORY_STATIC (aggregator_debug); +#define GST_CAT_DEFAULT aggregator_debug /* Locking order, locks in this element must always be taken in this order * @@ -113,12 +148,6 @@ static gint64 gst_aggregator_get_latency_property (GstAggregator * agg); * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad) */ - -static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); - -GST_DEBUG_CATEGORY_STATIC (aggregator_debug); -#define GST_CAT_DEFAULT aggregator_debug - /* GstAggregatorPad definitions */ #define PAD_LOCK(pad) G_STMT_START { \ GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p", \ @@ -207,18 +236,20 @@ struct _GstAggregatorPadPrivate GstFlowReturn flow_return; gboolean pending_flush_start; gboolean pending_flush_stop; - gboolean pending_eos; gboolean first_buffer; - GQueue buffers; + GQueue data; /* buffers, events and queries */ GstBuffer *clipped_buffer; guint num_buffers; + + /* used to track fill state of queues, only used with live-src and when + * latency property is set to > 0 */ GstClockTime head_position; GstClockTime tail_position; - GstClockTime head_time; + GstClockTime head_time; /* running time */ GstClockTime tail_time; - GstClockTime time_level; + GstClockTime time_level; /* how much head is ahead of tail */ GstSegment head_segment; /* segment before the queue */ gboolean negotiated; @@ -231,13 +262,15 @@ struct _GstAggregatorPadPrivate * the chain function is also happening. */ GMutex flush_lock; + + /* properties */ + gboolean emit_signals; }; /* Must be called with PAD_LOCK held */ static void gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad) { - aggpad->priv->pending_eos = FALSE; aggpad->priv->eos = FALSE; aggpad->priv->flow_return = GST_FLOW_OK; GST_OBJECT_LOCK (aggpad); @@ -262,7 +295,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) PAD_UNLOCK (aggpad); if (klass->flush) - return klass->flush (aggpad, agg); + return (klass->flush (aggpad, agg) == GST_FLOW_OK); return TRUE; } @@ -271,6 +304,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) * GstAggregator implementation * *************************************/ static GstElementClass *aggregator_parent_class = NULL; +static gint aggregator_private_offset = 0; /* All members are protected by the object lock unless otherwise noted */ @@ -281,6 +315,8 @@ struct _GstAggregatorPrivate /* Our state is >= PAUSED */ gboolean running; /* protected by src_lock */ + /* seqnum from seek or segment, + * to be applied to synthetic segment/eos events */ gint seqnum; gboolean send_stream_start; /* protected by srcpad stream lock */ gboolean send_segment; @@ -301,6 +337,8 @@ struct _GstAggregatorPrivate GstClockTime sub_latency_min; /* protected by src_lock */ GstClockTime sub_latency_max; /* protected by src_lock */ + GstClockTime upstream_latency_min; /* protected by src_lock */ + /* aggregate */ GstClockID aggregate_id; /* protected by src_lock */ GMutex src_lock; @@ -320,17 +358,21 @@ struct _GstAggregatorPrivate gint64 latency; /* protected by both src_lock and all pad locks */ }; +/* Seek event forwarding helper */ typedef struct { + /* parameters */ GstEvent *event; - gboolean result; gboolean flush; gboolean only_to_active_pads; + /* results */ + gboolean result; gboolean one_actually_seeked; } EventData; #define DEFAULT_LATENCY 0 +#define DEFAULT_MIN_UPSTREAM_LATENCY 0 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO #define DEFAULT_START_TIME (-1) @@ -338,6 +380,7 @@ enum { PROP_0, PROP_LATENCY, + PROP_MIN_UPSTREAM_LATENCY, PROP_START_TIME_SELECTION, PROP_START_TIME, PROP_LAST @@ -346,99 +389,20 @@ enum static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head); -/** - * gst_aggregator_iterate_sinkpads: - * @self: The #GstAggregator - * @func: (scope call): The function to call. - * @user_data: (closure): The data to pass to @func. - * - * Iterate the sinkpads of aggregator to call a function on them. - * - * This method guarantees that @func will be called only once for each - * sink pad. - */ -gboolean -gst_aggregator_iterate_sinkpads (GstAggregator * self, - GstAggregatorPadForeachFunc func, gpointer user_data) -{ - gboolean result = FALSE; - GstIterator *iter; - gboolean done = FALSE; - GValue item = { 0, }; - GList *seen_pads = NULL; - - iter = gst_element_iterate_sink_pads (GST_ELEMENT (self)); - - if (!iter) - goto no_iter; - - while (!done) { - switch (gst_iterator_next (iter, &item)) { - case GST_ITERATOR_OK: - { - GstAggregatorPad *pad; - - pad = g_value_get_object (&item); - - /* if already pushed, skip. FIXME, find something faster to tag pads */ - if (pad == NULL || g_list_find (seen_pads, pad)) { - g_value_reset (&item); - break; - } - - GST_LOG_OBJECT (pad, "calling function %s on pad", - GST_DEBUG_FUNCPTR_NAME (func)); - - result = func (self, pad, user_data); - - done = !result; - - seen_pads = g_list_prepend (seen_pads, pad); - - g_value_reset (&item); - break; - } - case GST_ITERATOR_RESYNC: - gst_iterator_resync (iter); - break; - case GST_ITERATOR_ERROR: - GST_ERROR_OBJECT (self, - "Could not iterate over internally linked pads"); - done = TRUE; - break; - case GST_ITERATOR_DONE: - done = TRUE; - break; - } - } - g_value_unset (&item); - gst_iterator_free (iter); - - if (seen_pads == NULL) { - GST_DEBUG_OBJECT (self, "No pad seen"); - return FALSE; - } - - g_list_free (seen_pads); - -no_iter: - return result; -} - static gboolean gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) { - return (g_queue_peek_tail (&pad->priv->buffers) == NULL && + return (g_queue_peek_tail (&pad->priv->data) == NULL && pad->priv->clipped_buffer == NULL); } static gboolean gst_aggregator_check_pads_ready (GstAggregator * self) { - GstAggregatorPad *pad; + GstAggregatorPad *pad = NULL; GList *l, *sinkpads; gboolean have_buffer = TRUE; - gboolean have_event = FALSE; + gboolean have_event_or_query = FALSE; GST_LOG_OBJECT (self, "checking pads"); @@ -455,7 +419,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self) if (pad->priv->num_buffers == 0) { if (!gst_aggregator_pad_queue_is_empty (pad)) - have_event = TRUE; + have_event_or_query = TRUE; if (!pad->priv->eos) { have_buffer = FALSE; @@ -476,7 +440,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self) PAD_UNLOCK (pad); } - if (!have_buffer && !have_event) + if (!have_buffer && !have_event_or_query) goto pad_not_ready; if (have_buffer) @@ -494,13 +458,13 @@ no_sinkpads: } pad_not_ready: { - if (have_event) + if (have_event_or_query) GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet," " but waking up for serialized event"); else GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet"); GST_OBJECT_UNLOCK (self); - return have_event; + return have_event_or_query; } } @@ -510,7 +474,8 @@ gst_aggregator_reset_flow_values (GstAggregator * self) GST_OBJECT_LOCK (self); self->priv->send_stream_start = TRUE; self->priv->send_segment = TRUE; - gst_segment_init (&self->segment, GST_FORMAT_TIME); + gst_segment_init (&GST_AGGREGATOR_PAD (self->srcpad)->segment, + GST_FORMAT_TIME); self->priv->first_buffer = TRUE; GST_OBJECT_UNLOCK (self); } @@ -528,7 +493,8 @@ gst_aggregator_push_mandatory_events (GstAggregator * self) GST_INFO_OBJECT (self, "pushing stream start"); /* stream-start (FIXME: create id based on input ids) */ g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ()); - if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) { + if (!gst_pad_push_event (GST_PAD (self->srcpad), + gst_event_new_stream_start (s_id))) { GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed"); } self->priv->send_stream_start = FALSE; @@ -538,7 +504,7 @@ gst_aggregator_push_mandatory_events (GstAggregator * self) GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT, self->priv->srccaps); - if (!gst_pad_push_event (self->srcpad, + if (!gst_pad_push_event (GST_PAD (self->srcpad), gst_event_new_caps (self->priv->srccaps))) { GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed"); } @@ -548,9 +514,12 @@ gst_aggregator_push_mandatory_events (GstAggregator * self) GST_OBJECT_LOCK (self); if (self->priv->send_segment && !self->priv->flush_seeking) { - segment = gst_event_new_segment (&self->segment); + segment = + gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment); if (!self->priv->seqnum) + /* This code-path is in preparation to be able to run without a source + * connected. Then we won't have a seq-num from a segment event. */ self->priv->seqnum = gst_event_get_seqnum (segment); else gst_event_set_seqnum (segment, self->priv->seqnum); @@ -588,17 +557,8 @@ gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps) GST_PAD_STREAM_UNLOCK (self->srcpad); } -/** - * gst_aggregator_finish_buffer: - * @self: The #GstAggregator - * @buffer: (transfer full): the #GstBuffer to push. - * - * This method will push the provided output buffer downstream. If needed, - * mandatory events such as stream-start, caps, and segment events will be - * sent before pushing the buffer. - */ -GstFlowReturn -gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer) +static GstFlowReturn +gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer) { gst_aggregator_push_mandatory_events (self); @@ -616,6 +576,25 @@ gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer) } } +/** + * gst_aggregator_finish_buffer: + * @aggregator: The #GstAggregator + * @buffer: (transfer full): the #GstBuffer to push. + * + * This method will push the provided output buffer downstream. If needed, + * mandatory events such as stream-start, caps, and segment events will be + * sent before pushing the buffer. + */ +GstFlowReturn +gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer) +{ + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator); + + g_assert (klass->finish_buffer != NULL); + + return klass->finish_buffer (aggregator, buffer); +} + static void gst_aggregator_push_eos (GstAggregator * self) { @@ -747,77 +726,112 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) return res; } +typedef struct +{ + gboolean processed_event; + GstFlowReturn flow_ret; +} DoHandleEventsAndQueriesData; + static gboolean -check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) +gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, + gpointer user_data) { + GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); + GstAggregator *aggregator = GST_AGGREGATOR_CAST (self); GstEvent *event = NULL; GstQuery *query = NULL; GstAggregatorClass *klass = NULL; - gboolean *processed_event = user_data; + DoHandleEventsAndQueriesData *data = user_data; do { event = NULL; + query = NULL; PAD_LOCK (pad); - if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) { - pad->priv->pending_eos = FALSE; - pad->priv->eos = TRUE; - } if (pad->priv->clipped_buffer == NULL && - !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) { - if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) - event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers)); - if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->buffers))) - query = g_queue_peek_tail (&pad->priv->buffers); + !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { + if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))) + event = gst_event_ref (g_queue_peek_tail (&pad->priv->data)); + if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data))) + query = g_queue_peek_tail (&pad->priv->data); } PAD_UNLOCK (pad); if (event || query) { gboolean ret; - if (processed_event) - *processed_event = TRUE; + data->processed_event = TRUE; if (klass == NULL) klass = GST_AGGREGATOR_GET_CLASS (self); - GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); - if (event) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); gst_event_ref (event); - ret = klass->sink_event (self, pad, event); + ret = klass->sink_event (aggregator, pad, event); PAD_LOCK (pad); - if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) + if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) { pad->priv->negotiated = ret; - if (g_queue_peek_tail (&pad->priv->buffers) == event) - gst_event_unref (g_queue_pop_tail (&pad->priv->buffers)); + if (!ret) + pad->priv->flow_return = data->flow_ret = GST_FLOW_NOT_NEGOTIATED; + } + if (g_queue_peek_tail (&pad->priv->data) == event) + gst_event_unref (g_queue_pop_tail (&pad->priv->data)); gst_event_unref (event); - } - - if (query) { - GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); - ret = klass->sink_query (self, pad, query); + } else if (query) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query); + ret = klass->sink_query (aggregator, pad, query); PAD_LOCK (pad); - if (g_queue_peek_tail (&pad->priv->buffers) == query) { + if (g_queue_peek_tail (&pad->priv->data) == query) { GstStructure *s; s = gst_query_writable_structure (query); gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret, NULL); - g_queue_pop_tail (&pad->priv->buffers); + g_queue_pop_tail (&pad->priv->data); } } PAD_BROADCAST_EVENT (pad); PAD_UNLOCK (pad); } - if (query) { - if (processed_event) - *processed_event = TRUE; - if (klass == NULL) - klass = GST_AGGREGATOR_GET_CLASS (self); + } while (event || query); + + return TRUE; +} + +static gboolean +gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad, + gpointer user_data) +{ + GList *item; + GstAggregatorPad *aggpad = (GstAggregatorPad *) epad; + GstAggregator *agg = (GstAggregator *) self; + GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad); + + if (!klass->skip_buffer) + return FALSE; + + PAD_LOCK (aggpad); + + item = g_queue_peek_head_link (&aggpad->priv->data); + while (item) { + GList *next = item->next; + + if (GST_IS_BUFFER (item->data) + && klass->skip_buffer (aggpad, agg, item->data)) { + GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data); + gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data)); + gst_buffer_unref (item->data); + g_queue_delete_link (&aggpad->priv->data, item); + } else { + break; } - } while (event != NULL); + + item = next; + } + + PAD_UNLOCK (aggpad); return TRUE; } @@ -834,7 +848,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, else aggpad->priv->flow_return = flow_return; - item = g_queue_peek_head_link (&aggpad->priv->buffers); + item = g_queue_peek_head_link (&aggpad->priv->data); while (item) { GList *next = item->next; @@ -847,7 +861,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, !GST_EVENT_IS_STICKY (item->data)) { if (!GST_IS_QUERY (item->data)) gst_mini_object_unref (item->data); - g_queue_delete_link (&aggpad->priv->buffers, item); + g_queue_delete_link (&aggpad->priv->data, item); } item = next; } @@ -1099,15 +1113,32 @@ gst_aggregator_aggregate_func (GstAggregator * self) GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && priv->running) { GstFlowReturn flow_return = GST_FLOW_OK; - gboolean processed_event = FALSE; + DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK }; + + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_do_events_and_queries, &events_query_data); - gst_aggregator_iterate_sinkpads (self, check_events, NULL); + if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK) + goto handle_error; + if (self->priv->peer_latency_live) + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_pad_skip_buffers, NULL); + + /* Ensure we have buffers ready (either in clipped_buffer or at the head of + * the queue */ if (!gst_aggregator_wait_and_check (self, &timeout)) continue; - gst_aggregator_iterate_sinkpads (self, check_events, &processed_event); - if (processed_event) + events_query_data.processed_event = FALSE; + events_query_data.flow_ret = GST_FLOW_OK; + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_do_events_and_queries, &events_query_data); + + if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK) + goto handle_error; + + if (events_query_data.processed_event) continue; if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) { @@ -1137,6 +1168,7 @@ gst_aggregator_aggregate_func (GstAggregator * self) gst_aggregator_push_eos (self); } + handle_error: GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return)); if (flow_return != GST_FLOW_OK) { @@ -1319,38 +1351,37 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, static void update_time_level (GstAggregatorPad * aggpad, gboolean head) { + GstAggregatorPadPrivate *priv = aggpad->priv; + if (head) { - if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) && - aggpad->priv->head_segment.format == GST_FORMAT_TIME) - aggpad->priv->head_time = - gst_segment_to_running_time (&aggpad->priv->head_segment, - GST_FORMAT_TIME, aggpad->priv->head_position); + if (GST_CLOCK_TIME_IS_VALID (priv->head_position) && + priv->head_segment.format == GST_FORMAT_TIME) + priv->head_time = gst_segment_to_running_time (&priv->head_segment, + GST_FORMAT_TIME, priv->head_position); else - aggpad->priv->head_time = GST_CLOCK_TIME_NONE; + priv->head_time = GST_CLOCK_TIME_NONE; - if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time)) - aggpad->priv->tail_time = aggpad->priv->head_time; + if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time)) + priv->tail_time = priv->head_time; } else { - if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) && + if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) && aggpad->segment.format == GST_FORMAT_TIME) - aggpad->priv->tail_time = - gst_segment_to_running_time (&aggpad->segment, - GST_FORMAT_TIME, aggpad->priv->tail_position); + priv->tail_time = gst_segment_to_running_time (&aggpad->segment, + GST_FORMAT_TIME, priv->tail_position); else - aggpad->priv->tail_time = aggpad->priv->head_time; + priv->tail_time = priv->head_time; } - if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE || - aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) { - aggpad->priv->time_level = 0; + if (priv->head_time == GST_CLOCK_TIME_NONE || + priv->tail_time == GST_CLOCK_TIME_NONE) { + priv->time_level = 0; return; } - if (aggpad->priv->tail_time > aggpad->priv->head_time) - aggpad->priv->time_level = 0; + if (priv->tail_time > priv->head_time) + priv->time_level = 0; else - aggpad->priv->time_level = aggpad->priv->head_time - - aggpad->priv->tail_time; + priv->time_level = priv->head_time - priv->tail_time; } @@ -1363,6 +1394,8 @@ gst_aggregator_default_sink_event (GstAggregator * self, GstPad *pad = GST_PAD (aggpad); GstAggregatorPrivate *priv = self->priv; + GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event); + switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: { @@ -1373,8 +1406,6 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_FLUSH_STOP: { - GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP"); - gst_aggregator_pad_flush (aggpad, self); GST_OBJECT_LOCK (self); if (priv->flush_seeking) { @@ -1406,21 +1437,11 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_EOS: { - GST_DEBUG_OBJECT (aggpad, "EOS"); - - /* We still have a buffer, and we don't want the subclass to have to - * check for it. Mark pending_eos, eos will be set when steal_buffer is - * called - */ SRC_LOCK (self); PAD_LOCK (aggpad); - if (aggpad->priv->num_buffers == 0) { - aggpad->priv->eos = TRUE; - } else { - aggpad->priv->pending_eos = TRUE; - } + g_assert (aggpad->priv->num_buffers == 0); + aggpad->priv->eos = TRUE; PAD_UNLOCK (aggpad); - SRC_BROADCAST (self); SRC_UNLOCK (self); goto eat; @@ -1454,7 +1475,6 @@ gst_aggregator_default_sink_event (GstAggregator * self, GstBuffer *gapbuf; gst_event_parse_gap (event, &pts, &duration); - gapbuf = gst_buffer_new (); if (GST_CLOCK_TIME_IS_VALID (duration)) endpts = pts + duration; @@ -1476,11 +1496,18 @@ gst_aggregator_default_sink_event (GstAggregator * self, else duration = GST_CLOCK_TIME_NONE; + gapbuf = gst_buffer_new (); GST_BUFFER_PTS (gapbuf) = pts; GST_BUFFER_DURATION (gapbuf) = duration; GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP); GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE); + /* Remove GAP event so we can replace it with the buffer */ + PAD_LOCK (aggpad); + if (g_queue_peek_tail (&aggpad->priv->data) == event) + gst_event_unref (g_queue_pop_tail (&aggpad->priv->data)); + PAD_UNLOCK (aggpad); + if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) != GST_FLOW_OK) { GST_WARNING_OBJECT (self, "Failed to chain gap buffer"); @@ -1490,19 +1517,7 @@ gst_aggregator_default_sink_event (GstAggregator * self, goto eat; } case GST_EVENT_TAG: - { - GstTagList *tags; - - gst_event_parse_tag (event, &tags); - - if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) { - gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE); - gst_event_unref (event); - event = NULL; - goto eat; - } - break; - } + goto eat; default: { break; @@ -1520,11 +1535,13 @@ eat: return res; } -static inline gboolean -gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad, - gpointer unused_udata) +static gboolean +gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data) { - gst_aggregator_pad_flush (pad, self); + GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); + GstAggregator *agg = GST_AGGREGATOR_CAST (self); + + gst_aggregator_pad_flush (pad, agg); PAD_LOCK (pad); pad->priv->flow_return = GST_FLOW_FLUSHING; @@ -1543,7 +1560,9 @@ gst_aggregator_stop (GstAggregator * agg) gst_aggregator_reset_flow_values (agg); - gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL); + /* Application needs to make sure no pads are added while it shuts us down */ + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg), + gst_aggregator_stop_pad, NULL); klass = GST_AGGREGATOR_GET_CLASS (agg); @@ -1554,7 +1573,7 @@ gst_aggregator_stop (GstAggregator * agg) agg->priv->has_peer_latency = FALSE; agg->priv->peer_latency_live = FALSE; - agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE; + agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0; if (agg->priv->tags) gst_tag_list_unref (agg->priv->tags); @@ -1638,6 +1657,9 @@ gst_aggregator_default_create_new_pad (GstAggregator * self, GstAggregatorPrivate *priv = self->priv; gint serial = 0; gchar *name = NULL; + GType pad_type = + GST_PAD_TEMPLATE_GTYPE (templ) == + G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ); if (templ->direction != GST_PAD_SINK) goto not_sink; @@ -1647,18 +1669,27 @@ gst_aggregator_default_create_new_pad (GstAggregator * self, GST_OBJECT_LOCK (self); if (req_name == NULL || strlen (req_name) < 6 - || !g_str_has_prefix (req_name, "sink_")) { + || !g_str_has_prefix (req_name, "sink_") + || strrchr (req_name, '%') != NULL) { /* no name given when requesting the pad, use next available int */ serial = ++priv->max_padserial; } else { + gchar *endptr = NULL; + /* parse serial number from requested padname */ - serial = g_ascii_strtoull (&req_name[5], NULL, 10); - if (serial > priv->max_padserial) - priv->max_padserial = serial; + serial = g_ascii_strtoull (&req_name[5], &endptr, 10); + if (endptr != NULL && *endptr == '\0') { + if (serial > priv->max_padserial) { + priv->max_padserial = serial; + } + } else { + serial = ++priv->max_padserial; + } } name = g_strdup_printf ("sink_%u", serial); - agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type, + g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD)); + agg_pad = g_object_new (pad_type, "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); g_free (name); @@ -1724,14 +1755,22 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) gst_query_parse_latency (query, &live, &min, &max); - our_latency = self->priv->latency; - if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) { GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min)); return FALSE; } + if (self->priv->upstream_latency_min > min) { + GstClockTimeDiff diff = + GST_CLOCK_DIFF (min, self->priv->upstream_latency_min); + + min += diff; + if (GST_CLOCK_TIME_IS_VALID (max)) { + max += diff; + } + } + if (min > max && GST_CLOCK_TIME_IS_VALID (max)) { GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL), ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %" @@ -1740,6 +1779,8 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) return FALSE; } + our_latency = self->priv->latency; + self->priv->peer_latency_live = live; self->priv->peer_latency_min = min; self->priv->peer_latency_max = max; @@ -1841,8 +1882,8 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event) &start, &stop_type, &stop); GST_OBJECT_LOCK (self); - gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, - stop_type, stop, NULL); + gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt, + flags, start_type, start, stop_type, stop, NULL); self->priv->seqnum = gst_event_get_seqnum (event); self->priv->first_buffer = FALSE; GST_OBJECT_UNLOCK (self); @@ -1901,7 +1942,6 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) } else { ret = gst_pad_send_event (peer, gst_event_ref (evdata->event)); GST_DEBUG_OBJECT (pad, "return of event push is %d", ret); - gst_object_unref (peer); } } @@ -1941,26 +1981,24 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) evdata->result &= ret; + if (peer) + gst_object_unref (peer); + /* Always send to all pads */ return FALSE; } -static EventData +static void gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, - GstEvent * event, gboolean flush, gboolean only_to_active_pads) + EventData * evdata) { - EventData evdata; - - evdata.event = event; - evdata.result = TRUE; - evdata.flush = flush; - evdata.one_actually_seeked = FALSE; - evdata.only_to_active_pads = only_to_active_pads; + evdata->result = TRUE; + evdata->one_actually_seeked = FALSE; /* We first need to set all pads as flushing in a first pass * as flush_start flush_stop is sometimes sent synchronously * while we send the seek event */ - if (flush) { + if (evdata->flush) { GList *l; GST_OBJECT_LOCK (self); @@ -1975,11 +2013,9 @@ gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, GST_OBJECT_UNLOCK (self); } - gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, &evdata); + gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata); - gst_event_unref (event); - - return evdata; + gst_event_unref (evdata->event); } static gboolean @@ -1991,7 +2027,7 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) GstSeekType start_type, stop_type; gint64 start, stop; gboolean flush; - EventData evdata; + EventData evdata = { 0, }; GstAggregatorPrivate *priv = self->priv; gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type, @@ -2007,16 +2043,18 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) priv->flush_seeking = TRUE; } - gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, - stop_type, stop, NULL); + gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt, + flags, start_type, start, stop_type, stop, NULL); /* Seeking sets a position */ self->priv->first_buffer = FALSE; GST_OBJECT_UNLOCK (self); /* forward the seek upstream */ - evdata = - gst_aggregator_forward_event_to_all_sinkpads (self, event, flush, FALSE); + evdata.event = event; + evdata.flush = flush; + evdata.only_to_active_pads = FALSE; + gst_aggregator_forward_event_to_all_sinkpads (self, &evdata); event = NULL; if (!evdata.result || !evdata.one_actually_seeked) { @@ -2034,41 +2072,28 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) static gboolean gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event) { - EventData evdata; - gboolean res = TRUE; + EventData evdata = { 0, }; switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEEK: - { - gst_event_ref (event); - res = gst_aggregator_do_seek (self, event); - gst_event_unref (event); - event = NULL; - goto done; - } + /* _do_seek() unrefs the event. */ + return gst_aggregator_do_seek (self, event); case GST_EVENT_NAVIGATION: - { /* navigation is rather pointless. */ - res = FALSE; gst_event_unref (event); - goto done; - } + return FALSE; default: - { break; - } } /* Don't forward QOS events to pads that had no active buffer yet. Otherwise * they will receive a QOS event that has earliest_time=0 (because we can't * have negative timestamps), and consider their buffer as too late */ - evdata = - gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE, - GST_EVENT_TYPE (event) == GST_EVENT_QOS); - res = evdata.result; - -done: - return res; + evdata.event = event; + evdata.flush = FALSE; + evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS; + gst_aggregator_forward_event_to_all_sinkpads (self, &evdata); + return evdata.result; } static gboolean @@ -2194,7 +2219,7 @@ gst_aggregator_finalize (GObject * object) * as unresponsive. */ static void -gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) +gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency) { gboolean changed; @@ -2245,12 +2270,12 @@ gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) * before a pad is deemed unresponsive. A value of -1 means an * unlimited time. */ -static gint64 +static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg) { - gint64 res; + GstClockTime res; - g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1); + g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE); GST_OBJECT_LOCK (agg); res = agg->priv->latency; @@ -2267,7 +2292,12 @@ gst_aggregator_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_LATENCY: - gst_aggregator_set_latency_property (agg, g_value_get_int64 (value)); + gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value)); + break; + case PROP_MIN_UPSTREAM_LATENCY: + SRC_LOCK (agg); + agg->priv->upstream_latency_min = g_value_get_uint64 (value); + SRC_UNLOCK (agg); break; case PROP_START_TIME_SELECTION: agg->priv->start_time_selection = g_value_get_enum (value); @@ -2289,7 +2319,12 @@ gst_aggregator_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_LATENCY: - g_value_set_int64 (value, gst_aggregator_get_latency_property (agg)); + g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg)); + break; + case PROP_MIN_UPSTREAM_LATENCY: + SRC_LOCK (agg); + g_value_set_uint64 (value, agg->priv->upstream_latency_min); + SRC_UNLOCK (agg); break; case PROP_START_TIME_SELECTION: g_value_set_enum (value, agg->priv->start_time_selection); @@ -2311,12 +2346,14 @@ gst_aggregator_class_init (GstAggregatorClass * klass) GstElementClass *gstelement_class = (GstElementClass *) klass; aggregator_parent_class = g_type_class_peek_parent (klass); - g_type_class_add_private (klass, sizeof (GstAggregatorPrivate)); GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator", GST_DEBUG_FG_MAGENTA, "GstAggregator"); - klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD; + if (aggregator_private_offset != 0) + g_type_class_adjust_private_offset (klass, &aggregator_private_offset); + + klass->finish_buffer = gst_aggregator_default_finish_buffer; klass->sink_event = gst_aggregator_default_sink_event; klass->sink_query = gst_aggregator_default_sink_query; @@ -2342,11 +2379,31 @@ gst_aggregator_class_init (GstAggregatorClass * klass) gobject_class->finalize = gst_aggregator_finalize; g_object_class_install_property (gobject_class, PROP_LATENCY, - g_param_spec_int64 ("latency", "Buffer latency", + g_param_spec_uint64 ("latency", "Buffer latency", "Additional latency in live mode to allow upstream " "to take longer to produce buffers for the current " - "position (in nanoseconds)", 0, - (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1), + "position (in nanoseconds)", 0, G_MAXUINT64, + DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstAggregator:min-upstream-latency: + * + * Force minimum upstream latency (in nanoseconds). When sources with a + * higher latency are expected to be plugged in dynamically after the + * aggregator has started playing, this allows overriding the minimum + * latency reported by the initial source(s). This is only taken into + * account when larger than the actually reported minimum latency. + * + * Since: 1.16 + */ + g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY, + g_param_spec_uint64 ("min-upstream-latency", "Buffer latency", + "When sources with a higher latency are expected to be plugged " + "in dynamically after the aggregator has started playing, " + "this allows overriding the minimum latency reported by the " + "initial source(s). This is only taken into account when larger " + "than the actually reported minimum latency. (nanoseconds)", + 0, G_MAXUINT64, DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION, @@ -2361,8 +2418,12 @@ gst_aggregator_class_init (GstAggregatorClass * klass) "Start time to use if start-time-selection=set", 0, G_MAXUINT64, DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +} - GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad); +static inline gpointer +gst_aggregator_get_instance_private (GstAggregator * self) +{ + return (G_STRUCT_MEMBER_P (self, aggregator_private_offset)); } static void @@ -2370,12 +2431,11 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) { GstPadTemplate *pad_template; GstAggregatorPrivate *priv; + GType pad_type; g_return_if_fail (klass->aggregate != NULL); - self->priv = - G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR, - GstAggregatorPrivate); + self->priv = gst_aggregator_get_instance_private (self); priv = self->priv; @@ -2390,9 +2450,17 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) self->priv->peer_latency_min = self->priv->sub_latency_min = 0; self->priv->peer_latency_max = self->priv->sub_latency_max = 0; self->priv->has_peer_latency = FALSE; - gst_aggregator_reset_flow_values (self); - self->srcpad = gst_pad_new_from_template (pad_template, "src"); + pad_type = + GST_PAD_TEMPLATE_GTYPE (pad_template) == + G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : + GST_PAD_TEMPLATE_GTYPE (pad_template); + g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD)); + self->srcpad = + g_object_new (pad_type, "name", "src", "direction", GST_PAD_SRC, + "template", pad_template, NULL); + + gst_aggregator_reset_flow_values (self); gst_pad_set_event_function (self->srcpad, GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func)); @@ -2403,6 +2471,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) gst_element_add_pad (GST_ELEMENT (self), self->srcpad); + self->priv->upstream_latency_min = DEFAULT_MIN_UPSTREAM_LATENCY; self->priv->latency = DEFAULT_LATENCY; self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION; self->priv->start_time = DEFAULT_START_TIME; @@ -2434,6 +2503,10 @@ gst_aggregator_get_type (void) _type = g_type_register_static (GST_TYPE_ELEMENT, "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT); + + aggregator_private_offset = + g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate)); + g_once_init_leave (&type, _type); } return type; @@ -2490,6 +2563,12 @@ apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) update_time_level (aggpad, head); } +/* + * Can be called either from the sinkpad's chain function or from the srcpad's + * thread in the case of a buffer synthetized from a GAP event. + * Because of this second case, FLUSH_LOCK can't be used here. + */ + static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) @@ -2497,18 +2576,11 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, GstFlowReturn flow_return; GstClockTime buf_pts; - GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); - - PAD_FLUSH_LOCK (aggpad); - PAD_LOCK (aggpad); flow_return = aggpad->priv->flow_return; if (flow_return != GST_FLOW_OK) goto flushing; - if (aggpad->priv->pending_eos == TRUE) - goto eos; - PAD_UNLOCK (aggpad); buf_pts = GST_BUFFER_PTS (buffer); @@ -2523,12 +2595,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, aggpad->priv->first_buffer = FALSE; } - if (gst_aggregator_pad_has_space (self, aggpad) + if ((gst_aggregator_pad_has_space (self, aggpad) || !head) && aggpad->priv->flow_return == GST_FLOW_OK) { if (head) - g_queue_push_head (&aggpad->priv->buffers, buffer); + g_queue_push_head (&aggpad->priv->data, buffer); else - g_queue_push_tail (&aggpad->priv->buffers, buffer); + g_queue_push_tail (&aggpad->priv->data, buffer); apply_buffer (aggpad, buffer, head); aggpad->priv->num_buffers++; buffer = NULL; @@ -2552,6 +2624,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, if (self->priv->first_buffer) { GstClockTime start_time; + GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad); switch (self->priv->start_time_selection) { case GST_AGGREGATOR_START_TIME_SELECTION_ZERO: @@ -2585,10 +2658,10 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, } if (start_time != -1) { - if (self->segment.position == -1) - self->segment.position = start_time; + if (srcpad->segment.position == -1) + srcpad->segment.position = start_time; else - self->segment.position = MIN (start_time, self->segment.position); + srcpad->segment.position = MIN (start_time, srcpad->segment.position); GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT, GST_TIME_ARGS (start_time)); @@ -2599,15 +2672,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, GST_OBJECT_UNLOCK (self); SRC_UNLOCK (self); - PAD_FLUSH_UNLOCK (aggpad); - GST_DEBUG_OBJECT (aggpad, "Done chaining"); return flow_return; flushing: PAD_UNLOCK (aggpad); - PAD_FLUSH_UNLOCK (aggpad); GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer", gst_flow_get_name (flow_return)); @@ -2615,22 +2685,22 @@ flushing: gst_buffer_unref (buffer); return flow_return; - -eos: - PAD_UNLOCK (aggpad); - PAD_FLUSH_UNLOCK (aggpad); - - gst_buffer_unref (buffer); - GST_DEBUG_OBJECT (aggpad, "We are EOS already..."); - - return GST_FLOW_EOS; } static GstFlowReturn gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) { - return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), - GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE); + GstFlowReturn ret; + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); + + PAD_FLUSH_LOCK (aggpad); + + ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), + aggpad, buffer, TRUE); + + PAD_FLUSH_UNLOCK (aggpad); + + return ret; } static gboolean @@ -2639,7 +2709,6 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, { GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); if (GST_QUERY_IS_SERIALIZED (query)) { GstStructure *s; @@ -2653,7 +2722,7 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, goto flushing; } - g_queue_push_head (&aggpad->priv->buffers, query); + g_queue_push_head (&aggpad->priv->data, query); SRC_BROADCAST (self); SRC_UNLOCK (self); @@ -2667,7 +2736,7 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret)) gst_structure_remove_field (s, "gst-aggregator-retval"); else - g_queue_remove (&aggpad->priv->buffers, query); + g_queue_remove (&aggpad->priv->data, query); if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; @@ -2675,9 +2744,11 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, PAD_UNLOCK (aggpad); return ret; - } + } else { + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - return klass->sink_query (self, aggpad, query); + return klass->sink_query (self, aggpad, query); + } flushing: GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", @@ -2687,6 +2758,10 @@ flushing: return FALSE; } +/* Queue serialized events and let the others go through directly. + * The queued events with be handled from the src-pad task in + * gst_aggregator_do_events_and_queries(). + */ static GstFlowReturn gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GstEvent * event) @@ -2694,18 +2769,14 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GstFlowReturn ret = GST_FLOW_OK; GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS - /* && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE */ ) { + if (GST_EVENT_IS_SERIALIZED (event) + && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { SRC_LOCK (self); PAD_LOCK (aggpad); - if (aggpad->priv->flow_return != GST_FLOW_OK - && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { - ret = aggpad->priv->flow_return; + if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; - } if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { GST_OBJECT_LOCK (aggpad); @@ -2715,18 +2786,14 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GST_OBJECT_UNLOCK (aggpad); } - if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { - GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, - event); - g_queue_push_head (&aggpad->priv->buffers, event); - event = NULL; - SRC_BROADCAST (self); - } + GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event); + g_queue_push_head (&aggpad->priv->data, event); + SRC_BROADCAST (self); PAD_UNLOCK (aggpad); SRC_UNLOCK (self); - } + } else { + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - if (event) { if (!klass->sink_event (self, aggpad, event)) { /* Copied from GstPad to convert boolean to a GstFlowReturn in * the event handling func */ @@ -2745,7 +2812,7 @@ flushing: gst_pad_store_sticky_event (pad, event); gst_event_unref (event); - return ret; + return aggpad->priv->flow_return; } static gboolean @@ -2773,21 +2840,39 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad, /*********************************** * GstAggregatorPad implementation * ************************************/ -G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD); +G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD); + +#define DEFAULT_PAD_EMIT_SIGNALS FALSE + +enum +{ + PAD_PROP_0, + PAD_PROP_EMIT_SIGNALS, +}; + +enum +{ + PAD_SIGNAL_BUFFER_CONSUMED, + PAD_LAST_SIGNAL, +}; + +static guint gst_aggregator_pad_signals[PAD_LAST_SIGNAL] = { 0 }; static void gst_aggregator_pad_constructed (GObject * object) { GstPad *pad = GST_PAD (object); - gst_pad_set_chain_function (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain)); - gst_pad_set_event_full_function_full (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL); - gst_pad_set_query_function (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func)); - gst_pad_set_activatemode_function (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func)); + if (GST_PAD_IS_SINK (pad)) { + gst_pad_set_chain_function (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain)); + gst_pad_set_event_full_function_full (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL); + gst_pad_set_query_function (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func)); + gst_pad_set_activatemode_function (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func)); + } } static void @@ -2813,25 +2898,82 @@ gst_aggregator_pad_dispose (GObject * object) } static void +gst_aggregator_pad_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object); + + switch (prop_id) { + case PAD_PROP_EMIT_SIGNALS: + pad->priv->emit_signals = g_value_get_boolean (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_aggregator_pad_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object); + + switch (prop_id) { + case PAD_PROP_EMIT_SIGNALS: + g_value_set_boolean (value, pad->priv->emit_signals); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void gst_aggregator_pad_class_init (GstAggregatorPadClass * klass) { GObjectClass *gobject_class = (GObjectClass *) klass; - g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate)); - gobject_class->constructed = gst_aggregator_pad_constructed; gobject_class->finalize = gst_aggregator_pad_finalize; gobject_class->dispose = gst_aggregator_pad_dispose; + gobject_class->set_property = gst_aggregator_pad_set_property; + gobject_class->get_property = gst_aggregator_pad_get_property; + + /** + * GstAggregatorPad:buffer-consumed: + * + * Signals that a buffer was consumed. As aggregator pads store buffers + * in an internal queue, there is no direct match between input and output + * buffers at any given time. This signal can be useful to forward metas + * such as #GstVideoTimeCodeMeta or #GstVideoCaptionMeta at the right time. + * + * Since: 1.16 + */ + gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED] = + g_signal_new ("buffer-consumed", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_FIRST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, GST_TYPE_BUFFER); + + /** + * GstAggregatorPad:emit-signals: + * + * Enables the emission of signals such as #GstAggregatorPad::buffer-consumed + * + * Since: 1.16 + */ + g_object_class_install_property (gobject_class, PAD_PROP_EMIT_SIGNALS, + g_param_spec_boolean ("emit-signals", "Emit signals", + "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static void gst_aggregator_pad_init (GstAggregatorPad * pad) { - pad->priv = - G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD, - GstAggregatorPadPrivate); + pad->priv = gst_aggregator_pad_get_instance_private (pad); - g_queue_init (&pad->priv->buffers); + g_queue_init (&pad->priv->data); g_cond_init (&pad->priv->event_cond); g_mutex_init (&pad->priv->flush_lock); @@ -2839,17 +2981,18 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) gst_aggregator_pad_reset_unlocked (pad); pad->priv->negotiated = FALSE; + pad->priv->emit_signals = DEFAULT_PAD_EMIT_SIGNALS; } /* Must be called with the PAD_LOCK held */ static void -gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad) +gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer) { pad->priv->num_buffers--; - GST_TRACE_OBJECT (pad, "Consuming buffer"); - if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) { - pad->priv->pending_eos = FALSE; - pad->priv->eos = TRUE; + GST_TRACE_OBJECT (pad, "Consuming buffer %" GST_PTR_FORMAT, buffer); + if (buffer && pad->priv->emit_signals) { + g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED], + 0, buffer); } PAD_BROADCAST_EVENT (pad); } @@ -2863,8 +3006,8 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) GstBuffer *buffer = NULL; while (pad->priv->clipped_buffer == NULL && - GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) { - buffer = g_queue_pop_tail (&pad->priv->buffers); + GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { + buffer = g_queue_pop_tail (&pad->priv->data); apply_buffer (pad, buffer, FALSE); @@ -2887,7 +3030,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) buffer = aggclass->clip (self, pad, buffer); if (buffer == NULL) { - gst_aggregator_pad_buffer_consumed (pad); + gst_aggregator_pad_buffer_consumed (pad, buffer); GST_TRACE_OBJECT (pad, "Clipping consumed the buffer"); } } @@ -2900,7 +3043,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) } /** - * gst_aggregator_pad_steal_buffer: + * gst_aggregator_pad_pop_buffer: * @pad: the pad to get buffer from * * Steal the ref to the buffer currently queued in @pad. @@ -2909,19 +3052,24 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) * queued. You should unref the buffer after usage. */ GstBuffer * -gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) +gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad) { GstBuffer *buffer; PAD_LOCK (pad); + if (pad->priv->flow_return != GST_FLOW_OK) { + PAD_UNLOCK (pad); + return NULL; + } + gst_aggregator_pad_clip_buffer_unlocked (pad); buffer = pad->priv->clipped_buffer; - pad->priv->clipped_buffer = NULL; if (buffer) { - gst_aggregator_pad_buffer_consumed (pad); + pad->priv->clipped_buffer = NULL; + gst_aggregator_pad_buffer_consumed (pad, buffer); GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); } @@ -2943,7 +3091,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) { GstBuffer *buf; - buf = gst_aggregator_pad_steal_buffer (pad); + buf = gst_aggregator_pad_pop_buffer (pad); if (buf == NULL) return FALSE; @@ -2953,7 +3101,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) } /** - * gst_aggregator_pad_get_buffer: + * gst_aggregator_pad_peek_buffer: * @pad: the pad to get buffer from * * Returns: (transfer full): A reference to the buffer in @pad or @@ -2961,12 +3109,17 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) * usage. */ GstBuffer * -gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) +gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad) { GstBuffer *buffer; PAD_LOCK (pad); + if (pad->priv->flow_return != GST_FLOW_OK) { + PAD_UNLOCK (pad); + return NULL; + } + gst_aggregator_pad_clip_buffer_unlocked (pad); if (pad->priv->clipped_buffer) { @@ -2979,6 +3132,37 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) return buffer; } +/** + * gst_aggregator_pad_has_buffer: + * @pad: the pad to check the buffer on + * + * This checks if a pad has a buffer available that will be returned by + * a call to gst_aggregator_pad_peek_buffer() or + * gst_aggregator_pad_pop_buffer(). + * + * Returns: %TRUE if the pad has a buffer available as the next thing. + * + * Since: 1.14.1 + */ +gboolean +gst_aggregator_pad_has_buffer (GstAggregatorPad * pad) +{ + gboolean has_buffer; + + PAD_LOCK (pad); + gst_aggregator_pad_clip_buffer_unlocked (pad); + has_buffer = (pad->priv->clipped_buffer != NULL); + PAD_UNLOCK (pad); + + return has_buffer; +} + +/** + * gst_aggregator_pad_is_eos: + * @pad: an aggregator pad + * + * Returns: %TRUE if the pad is EOS, otherwise %FALSE. + */ gboolean gst_aggregator_pad_is_eos (GstAggregatorPad * pad) { @@ -2991,7 +3175,8 @@ gst_aggregator_pad_is_eos (GstAggregatorPad * pad) return is_eos; } -/** +#if 0 +/* * gst_aggregator_merge_tags: * @self: a #GstAggregator * @tags: a #GstTagList to merge @@ -3025,6 +3210,7 @@ gst_aggregator_merge_tags (GstAggregator * self, self->priv->tags_changed = TRUE; GST_OBJECT_UNLOCK (self); } +#endif /** * gst_aggregator_set_latency: @@ -3115,3 +3301,40 @@ gst_aggregator_get_allocator (GstAggregator * self, if (params) *params = self->priv->allocation_params; } + +/** + * gst_aggregator_simple_get_next_time: + * @self: A #GstAggregator + * + * This is a simple #GstAggregator::get_next_time implementation that + * just looks at the #GstSegment on the srcpad of the aggregator and bases + * the next time on the running time there. + * + * This is the desired behaviour in most cases where you have a live source + * and you have a dead line based aggregator subclass. + * + * Returns: The running time based on the position + * + * Since: 1.16 + */ +GstClockTime +gst_aggregator_simple_get_next_time (GstAggregator * self) +{ + GstClockTime next_time; + GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad); + GstSegment *segment = &srcpad->segment; + + GST_OBJECT_LOCK (self); + if (segment->position == -1 || segment->position < segment->start) + next_time = segment->start; + else + next_time = segment->position; + + if (segment->stop != -1 && next_time > segment->stop) + next_time = segment->stop; + + next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time); + GST_OBJECT_UNLOCK (self); + + return next_time; +}