X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstaggregator.c;h=794657a41929a99e15d50ecfc92a0c00f78be642;hb=9f69034d4155c89f71e3f885b39a9e949768b606;hp=1e8d307e468232e6222c9354a9085dc8b22978a3;hpb=5ac166a5d9ead73bd2bce67fe854ed5d13dfa276;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 1e8d307..794657a 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -21,40 +21,62 @@ */ /** * SECTION: gstaggregator - * @short_description: manages a set of pads with the purpose of - * aggregating their buffers. + * @title: GstAggregator + * @short_description: Base class for mixers and muxers, manages a set of input + * pads and aggregates their streams * @see_also: gstcollectpads for historical reasons. * * Manages a set of pads with the purpose of aggregating their buffers. * Control is given to the subclass when all pads have data. - * - * - * Base class for mixers and muxers. Subclasses should at least implement + * + * * Base class for mixers and muxers. Subclasses should at least implement * the #GstAggregatorClass.aggregate() virtual method. - * - * - * When data is queued on all pads, tha aggregate vmethod is called. - * - * - * One can peek at the data on any given GstAggregatorPad with the - * gst_aggregator_pad_get_buffer () method, and take ownership of it - * with the gst_aggregator_pad_steal_buffer () method. When a buffer - * has been taken with steal_buffer (), a new buffer can be queued + * + * * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a + * #GstPadQueryFunction to queue all serialized data packets per sink pad. + * Subclasses should not overwrite those, but instead implement + * #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as + * needed. + * + * * When data is queued on all pads, the aggregate vmethod is called. + * + * * One can peek at the data on any given GstAggregatorPad with the + * gst_aggregator_pad_peek_buffer () method, and remove it from the pad + * with the gst_aggregator_pad_pop_buffer () method. When a buffer + * has been taken with pop_buffer (), a new buffer can be queued * on that pad. - * - * - * If the subclass wishes to push a buffer downstream in its aggregate + * + * * If the subclass wishes to push a buffer downstream in its aggregate * implementation, it should do so through the * gst_aggregator_finish_buffer () method. This method will take care * of sending and ordering mandatory events such as stream start, caps * and segment. - * - * - * Same goes for EOS events, which should not be pushed directly by the + * + * * Same goes for EOS events, which should not be pushed directly by the * subclass, it should instead return GST_FLOW_EOS in its aggregate * implementation. - * - * + * + * * Note that the aggregator logic regarding gap event handling is to turn + * these into gap buffers with matching PTS and duration. It will also + * flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE + * to ease their identification and subsequent processing. + * + * This class used to live in gst-plugins-bad and was moved to core. + * + * Since: 1.14 + */ + +/** + * SECTION: gstaggregatorpad + * @title: GstAggregatorPad + * @short_description: #GstPad subclass for pads managed by #GstAggregator + * @see_also: gstcollectpads for historical reasons. + * + * Pads managed by a #GstAggregor subclass. + * + * This class used to live in gst-plugins-bad and was moved to core. + * + * Since: 1.14 */ #ifdef HAVE_CONFIG_H @@ -65,14 +87,47 @@ #include "gstaggregator.h" +typedef enum +{ + GST_AGGREGATOR_START_TIME_SELECTION_ZERO, + GST_AGGREGATOR_START_TIME_SELECTION_FIRST, + GST_AGGREGATOR_START_TIME_SELECTION_SET +} GstAggregatorStartTimeSelection; + +static GType +gst_aggregator_start_time_selection_get_type (void) +{ + static GType gtype = 0; + + if (gtype == 0) { + static const GEnumValue values[] = { + {GST_AGGREGATOR_START_TIME_SELECTION_ZERO, + "Start at 0 running time (default)", "zero"}, + {GST_AGGREGATOR_START_TIME_SELECTION_FIRST, + "Start at first observed input running time", "first"}, + {GST_AGGREGATOR_START_TIME_SELECTION_SET, + "Set start time with start-time property", "set"}, + {0, NULL, NULL} + }; + + gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values); + } + return gtype; +} /* Might become API */ +#if 0 static void gst_aggregator_merge_tags (GstAggregator * aggregator, const GstTagList * tags, GstTagMergeMode mode); +#endif static void gst_aggregator_set_latency_property (GstAggregator * agg, - gint64 latency); -static gint64 gst_aggregator_get_latency_property (GstAggregator * agg); + GstClockTime latency); +static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg); +static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); + +GST_DEBUG_CATEGORY_STATIC (aggregator_debug); +#define GST_CAT_DEFAULT aggregator_debug /* Locking order, locks in this element must always be taken in this order * @@ -86,10 +141,6 @@ static gint64 gst_aggregator_get_latency_property (GstAggregator * agg); * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad) */ - -GST_DEBUG_CATEGORY_STATIC (aggregator_debug); -#define GST_CAT_DEFAULT aggregator_debug - /* GstAggregatorPad definitions */ #define PAD_LOCK(pad) G_STMT_START { \ GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p", \ @@ -109,16 +160,16 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug); #define PAD_WAIT_EVENT(pad) G_STMT_START { \ - GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p", \ + GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p", \ g_thread_self()); \ g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond), \ (&((GstAggregatorPad*)pad)->priv->lock)); \ - GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p", \ + GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \ g_thread_self()); \ } G_STMT_END #define PAD_BROADCAST_EVENT(pad) G_STMT_START { \ - GST_LOG_OBJECT (pad, "Signaling EVENT from thread %p", \ + GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p", \ g_thread_self()); \ g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond)); \ } G_STMT_END @@ -174,15 +225,28 @@ GST_DEBUG_CATEGORY_STATIC (aggregator_debug); struct _GstAggregatorPadPrivate { - /* To always be used atomically */ - gboolean flushing; - /* Following fields are protected by the PAD_LOCK */ + GstFlowReturn flow_return; gboolean pending_flush_start; gboolean pending_flush_stop; - gboolean pending_eos; - GstBuffer *buffer; + gboolean first_buffer; + + GQueue data; /* buffers, events and queries */ + GstBuffer *clipped_buffer; + guint num_buffers; + + /* used to track fill state of queues, only used with live-src and when + * latency property is set to > 0 */ + GstClockTime head_position; + GstClockTime tail_position; + GstClockTime head_time; /* running time */ + GstClockTime tail_time; + GstClockTime time_level; /* how much head is ahead of tail */ + GstSegment head_segment; /* segment before the queue */ + + gboolean negotiated; + gboolean eos; GMutex lock; @@ -193,14 +257,31 @@ struct _GstAggregatorPadPrivate GMutex flush_lock; }; +/* Must be called with PAD_LOCK held */ +static void +gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad) +{ + aggpad->priv->eos = FALSE; + aggpad->priv->flow_return = GST_FLOW_OK; + GST_OBJECT_LOCK (aggpad); + gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED); + gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED); + GST_OBJECT_UNLOCK (aggpad); + aggpad->priv->head_position = GST_CLOCK_TIME_NONE; + aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; + aggpad->priv->head_time = GST_CLOCK_TIME_NONE; + aggpad->priv->tail_time = GST_CLOCK_TIME_NONE; + aggpad->priv->time_level = 0; + aggpad->priv->first_buffer = TRUE; +} + static gboolean gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) { GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad); PAD_LOCK (aggpad); - aggpad->priv->eos = FALSE; - aggpad->priv->flushing = FALSE; + gst_aggregator_pad_reset_unlocked (aggpad); PAD_UNLOCK (aggpad); if (klass->flush) @@ -218,27 +299,29 @@ static GstElementClass *aggregator_parent_class = NULL; struct _GstAggregatorPrivate { - gint padcount; + gint max_padserial; /* Our state is >= PAUSED */ gboolean running; /* protected by src_lock */ + /* seqnum from seek or segment, + * to be applied to synthetic segment/eos events */ gint seqnum; gboolean send_stream_start; /* protected by srcpad stream lock */ gboolean send_segment; gboolean flush_seeking; gboolean pending_flush_start; gboolean send_eos; /* protected by srcpad stream lock */ - GstFlowReturn flow_return; GstCaps *srccaps; /* protected by the srcpad stream lock */ GstTagList *tags; gboolean tags_changed; - gboolean latency_live; /* protected by src_lock */ - GstClockTime latency_min; /* protected by src_lock */ - GstClockTime latency_max; /* protected by src_lock */ + gboolean peer_latency_live; /* protected by src_lock */ + GstClockTime peer_latency_min; /* protected by src_lock */ + GstClockTime peer_latency_max; /* protected by src_lock */ + gboolean has_peer_latency; /* protected by src_lock */ GstClockTime sub_latency_min; /* protected by src_lock */ GstClockTime sub_latency_max; /* protected by src_lock */ @@ -248,112 +331,63 @@ struct _GstAggregatorPrivate GMutex src_lock; GCond src_cond; + gboolean first_buffer; /* protected by object lock */ + GstAggregatorStartTimeSelection start_time_selection; + GstClockTime start_time; + + /* protected by the object lock */ + GstQuery *allocation_query; + GstAllocator *allocator; + GstBufferPool *pool; + GstAllocationParams allocation_params; + /* properties */ - gint64 latency; + gint64 latency; /* protected by both src_lock and all pad locks */ }; +/* Seek event forwarding helper */ typedef struct { + /* parameters */ GstEvent *event; - gboolean result; gboolean flush; + gboolean only_to_active_pads; + /* results */ + gboolean result; gboolean one_actually_seeked; } EventData; -#define DEFAULT_LATENCY 0 +#define DEFAULT_LATENCY 0 +#define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO +#define DEFAULT_START_TIME (-1) enum { PROP_0, PROP_LATENCY, + PROP_START_TIME_SELECTION, + PROP_START_TIME, PROP_LAST }; -/** - * gst_aggregator_iterate_sinkpads: - * @self: The #GstAggregator - * @func: (scope call): The function to call. - * @user_data: (closure): The data to pass to @func. - * - * Iterate the sinkpads of aggregator to call a function on them. - * - * This method guarantees that @func will be called only once for each - * sink pad. - */ -gboolean -gst_aggregator_iterate_sinkpads (GstAggregator * self, - GstAggregatorPadForeachFunc func, gpointer user_data) -{ - gboolean result = FALSE; - GstIterator *iter; - gboolean done = FALSE; - GValue item = { 0, }; - GList *seen_pads = NULL; - - iter = gst_element_iterate_sink_pads (GST_ELEMENT (self)); - - if (!iter) - goto no_iter; - - while (!done) { - switch (gst_iterator_next (iter, &item)) { - case GST_ITERATOR_OK: - { - GstAggregatorPad *pad; - - pad = g_value_get_object (&item); - - /* if already pushed, skip. FIXME, find something faster to tag pads */ - if (pad == NULL || g_list_find (seen_pads, pad)) { - g_value_reset (&item); - break; - } - - GST_LOG_OBJECT (pad, "calling function %s on pad", - GST_DEBUG_FUNCPTR_NAME (func)); - - result = func (self, pad, user_data); - - done = !result; - - seen_pads = g_list_prepend (seen_pads, pad); - - g_value_reset (&item); - break; - } - case GST_ITERATOR_RESYNC: - gst_iterator_resync (iter); - break; - case GST_ITERATOR_ERROR: - GST_ERROR_OBJECT (self, - "Could not iterate over internally linked pads"); - done = TRUE; - break; - case GST_ITERATOR_DONE: - done = TRUE; - break; - } - } - g_value_unset (&item); - gst_iterator_free (iter); - - if (seen_pads == NULL) { - GST_DEBUG_OBJECT (self, "No pad seen"); - return FALSE; - } +static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, + GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head); - g_list_free (seen_pads); - -no_iter: - return result; +static gboolean +gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) +{ + return (g_queue_peek_tail (&pad->priv->data) == NULL && + pad->priv->clipped_buffer == NULL); } static gboolean gst_aggregator_check_pads_ready (GstAggregator * self) { - GstAggregatorPad *pad; + GstAggregatorPad *pad = NULL; GList *l, *sinkpads; + gboolean have_buffer = TRUE; + gboolean have_event_or_query = FALSE; GST_LOG_OBJECT (self, "checking pads"); @@ -367,14 +401,36 @@ gst_aggregator_check_pads_ready (GstAggregator * self) pad = l->data; PAD_LOCK (pad); - if (pad->priv->buffer == NULL && !pad->priv->eos) { - PAD_UNLOCK (pad); - goto pad_not_ready; + + if (pad->priv->num_buffers == 0) { + if (!gst_aggregator_pad_queue_is_empty (pad)) + have_event_or_query = TRUE; + if (!pad->priv->eos) { + have_buffer = FALSE; + + /* If not live we need data on all pads, so leave the loop */ + if (!self->priv->peer_latency_live) { + PAD_UNLOCK (pad); + goto pad_not_ready; + } + } + } else if (self->priv->peer_latency_live) { + /* In live mode, having a single pad with buffers is enough to + * generate a start time from it. In non-live mode all pads need + * to have a buffer + */ + self->priv->first_buffer = FALSE; } - PAD_UNLOCK (pad); + PAD_UNLOCK (pad); } + if (!have_buffer && !have_event_or_query) + goto pad_not_ready; + + if (have_buffer) + self->priv->first_buffer = FALSE; + GST_OBJECT_UNLOCK (self); GST_LOG_OBJECT (self, "pads are ready"); return TRUE; @@ -387,9 +443,13 @@ no_sinkpads: } pad_not_ready: { - GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet"); + if (have_event_or_query) + GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet," + " but waking up for serialized event"); + else + GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet"); GST_OBJECT_UNLOCK (self); - return FALSE; + return have_event_or_query; } } @@ -397,10 +457,10 @@ static void gst_aggregator_reset_flow_values (GstAggregator * self) { GST_OBJECT_LOCK (self); - self->priv->flow_return = GST_FLOW_FLUSHING; self->priv->send_stream_start = TRUE; self->priv->send_segment = TRUE; gst_segment_init (&self->segment, GST_FORMAT_TIME); + self->priv->first_buffer = TRUE; GST_OBJECT_UNLOCK (self); } @@ -440,6 +500,8 @@ gst_aggregator_push_mandatory_events (GstAggregator * self) segment = gst_event_new_segment (&self->segment); if (!self->priv->seqnum) + /* This code-path is in preparation to be able to run without a source + * connected. Then we won't have a seq-num from a segment event. */ self->priv->seqnum = gst_event_get_seqnum (segment); else gst_event_set_seqnum (segment, self->priv->seqnum); @@ -477,17 +539,8 @@ gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps) GST_PAD_STREAM_UNLOCK (self->srcpad); } -/** - * gst_aggregator_finish_buffer: - * @self: The #GstAggregator - * @buffer: (transfer full): the #GstBuffer to push. - * - * This method will push the provided output buffer downstream. If needed, - * mandatory events such as stream-start, caps, and segment events will be - * sent before pushing the buffer. - */ -GstFlowReturn -gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer) +static GstFlowReturn +gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer) { gst_aggregator_push_mandatory_events (self); @@ -505,6 +558,25 @@ gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer) } } +/** + * gst_aggregator_finish_buffer: + * @aggregator: The #GstAggregator + * @buffer: (transfer full): the #GstBuffer to push. + * + * This method will push the provided output buffer downstream. If needed, + * mandatory events such as stream-start, caps, and segment events will be + * sent before pushing the buffer. + */ +GstFlowReturn +gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer) +{ + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator); + + g_assert (klass->finish_buffer != NULL); + + return klass->finish_buffer (aggregator, buffer); +} + static void gst_aggregator_push_eos (GstAggregator * self) { @@ -535,15 +607,15 @@ gst_aggregator_get_next_time (GstAggregator * self) static gboolean gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) { - GstClockTime latency_max, latency_min; + GstClockTime latency; GstClockTime start; - gboolean live, res; + gboolean res; *timeout = FALSE; SRC_LOCK (self); - gst_aggregator_get_latency_unlocked (self, &live, &latency_min, &latency_max); + latency = gst_aggregator_get_latency_unlocked (self); if (gst_aggregator_check_pads_ready (self)) { GST_DEBUG_OBJECT (self, "all pads have data"); @@ -561,12 +633,25 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) start = gst_aggregator_get_next_time (self); - if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) - || !GST_CLOCK_TIME_IS_VALID (start)) { + /* If we're not live, or if we use the running time + * of the first buffer as start time, we wait until + * all pads have buffers. + * Otherwise (i.e. if we are live!), we wait on the clock + * and if a pad does not have a buffer in time we ignore + * that pad. + */ + GST_OBJECT_LOCK (self); + if (!GST_CLOCK_TIME_IS_VALID (latency) || + !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) || + !GST_CLOCK_TIME_IS_VALID (start) || + (self->priv->first_buffer + && self->priv->start_time_selection == + GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) { /* We wake up here when something happened, and below * then check if we're ready now. If we return FALSE, * we will be directly called again. */ + GST_OBJECT_UNLOCK (self); SRC_WAIT (self); } else { GstClockTime base_time, time; @@ -577,26 +662,21 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT, GST_TIME_ARGS (start)); - GST_OBJECT_LOCK (self); base_time = GST_ELEMENT_CAST (self)->base_time; - clock = GST_ELEMENT_CLOCK (self); - if (clock) - gst_object_ref (clock); + clock = gst_object_ref (GST_ELEMENT_CLOCK (self)); GST_OBJECT_UNLOCK (self); time = base_time + start; - time += latency_min; + time += latency; GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %" GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT - " latency max %" GST_TIME_FORMAT " latency min %" GST_TIME_FORMAT - " current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time), - GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time), - GST_TIME_ARGS (start), GST_TIME_ARGS (latency_max), - GST_TIME_ARGS (latency_min), + " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")", + GST_TIME_ARGS (time), + GST_TIME_ARGS (base_time), + GST_TIME_ARGS (start), GST_TIME_ARGS (latency), GST_TIME_ARGS (gst_clock_get_time (clock))); - self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time); gst_object_unref (clock); SRC_UNLOCK (self); @@ -611,9 +691,8 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) } GST_DEBUG_OBJECT (self, - "clock returned %d (jitter: %s%" GST_TIME_FORMAT ")", - status, (jitter < 0 ? "-" : " "), - GST_TIME_ARGS ((jitter < 0 ? -jitter : jitter))); + "clock returned %d (jitter: %" GST_STIME_FORMAT ")", + status, GST_STIME_ARGS (jitter)); /* we timed out */ if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) { @@ -629,6 +708,334 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) return res; } +static gboolean +gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad, + gpointer user_data) +{ + GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); + GstAggregator *aggregator = GST_AGGREGATOR_CAST (self); + GstEvent *event = NULL; + GstQuery *query = NULL; + GstAggregatorClass *klass = NULL; + gboolean *processed_event = user_data; + + do { + event = NULL; + query = NULL; + + PAD_LOCK (pad); + if (pad->priv->clipped_buffer == NULL && + !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { + if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))) + event = gst_event_ref (g_queue_peek_tail (&pad->priv->data)); + if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data))) + query = g_queue_peek_tail (&pad->priv->data); + } + PAD_UNLOCK (pad); + if (event || query) { + gboolean ret; + + if (processed_event) + *processed_event = TRUE; + if (klass == NULL) + klass = GST_AGGREGATOR_GET_CLASS (self); + + if (event) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); + gst_event_ref (event); + ret = klass->sink_event (aggregator, pad, event); + + PAD_LOCK (pad); + if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) + pad->priv->negotiated = ret; + if (g_queue_peek_tail (&pad->priv->data) == event) + gst_event_unref (g_queue_pop_tail (&pad->priv->data)); + gst_event_unref (event); + } else if (query) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query); + ret = klass->sink_query (aggregator, pad, query); + + PAD_LOCK (pad); + if (g_queue_peek_tail (&pad->priv->data) == query) { + GstStructure *s; + + s = gst_query_writable_structure (query); + gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret, + NULL); + g_queue_pop_tail (&pad->priv->data); + } + } + + PAD_BROADCAST_EVENT (pad); + PAD_UNLOCK (pad); + } + } while (event || query); + + return TRUE; +} + +static void +gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, + GstFlowReturn flow_return, gboolean full) +{ + GList *item; + + PAD_LOCK (aggpad); + if (flow_return == GST_FLOW_NOT_LINKED) + aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return); + else + aggpad->priv->flow_return = flow_return; + + item = g_queue_peek_head_link (&aggpad->priv->data); + while (item) { + GList *next = item->next; + + /* In partial flush, we do like the pad, we get rid of non-sticky events + * and EOS/SEGMENT. + */ + if (full || GST_IS_BUFFER (item->data) || + GST_EVENT_TYPE (item->data) == GST_EVENT_EOS || + GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT || + !GST_EVENT_IS_STICKY (item->data)) { + if (!GST_IS_QUERY (item->data)) + gst_mini_object_unref (item->data); + g_queue_delete_link (&aggpad->priv->data, item); + } + item = next; + } + aggpad->priv->num_buffers = 0; + gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL); + + PAD_BROADCAST_EVENT (aggpad); + PAD_UNLOCK (aggpad); +} + +static GstFlowReturn +gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps, + GstCaps ** ret) +{ + *ret = gst_caps_ref (caps); + + return GST_FLOW_OK; +} + +static GstCaps * +gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps) +{ + caps = gst_caps_fixate (caps); + + return caps; +} + +static gboolean +gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps) +{ + return TRUE; +} + + +/* takes ownership of the pool, allocator and query */ +static gboolean +gst_aggregator_set_allocation (GstAggregator * self, + GstBufferPool * pool, GstAllocator * allocator, + GstAllocationParams * params, GstQuery * query) +{ + GstAllocator *oldalloc; + GstBufferPool *oldpool; + GstQuery *oldquery; + + GST_DEBUG ("storing allocation query"); + + GST_OBJECT_LOCK (self); + oldpool = self->priv->pool; + self->priv->pool = pool; + + oldalloc = self->priv->allocator; + self->priv->allocator = allocator; + + oldquery = self->priv->allocation_query; + self->priv->allocation_query = query; + + if (params) + self->priv->allocation_params = *params; + else + gst_allocation_params_init (&self->priv->allocation_params); + GST_OBJECT_UNLOCK (self); + + if (oldpool) { + GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool); + gst_buffer_pool_set_active (oldpool, FALSE); + gst_object_unref (oldpool); + } + if (oldalloc) { + gst_object_unref (oldalloc); + } + if (oldquery) { + gst_query_unref (oldquery); + } + return TRUE; +} + + +static gboolean +gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query) +{ + GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self); + + if (aggclass->decide_allocation) + if (!aggclass->decide_allocation (self, query)) + return FALSE; + + return TRUE; +} + +static gboolean +gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps) +{ + GstQuery *query; + gboolean result = TRUE; + GstBufferPool *pool = NULL; + GstAllocator *allocator; + GstAllocationParams params; + + /* find a pool for the negotiated caps now */ + GST_DEBUG_OBJECT (self, "doing allocation query"); + query = gst_query_new_allocation (caps, TRUE); + if (!gst_pad_peer_query (self->srcpad, query)) { + /* not a problem, just debug a little */ + GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed"); + } + + GST_DEBUG_OBJECT (self, "calling decide_allocation"); + result = gst_aggregator_decide_allocation (self, query); + + GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result, + query); + + if (!result) + goto no_decide_allocation; + + /* we got configuration from our peer or the decide_allocation method, + * parse them */ + if (gst_query_get_n_allocation_params (query) > 0) { + gst_query_parse_nth_allocation_param (query, 0, &allocator, ¶ms); + } else { + allocator = NULL; + gst_allocation_params_init (¶ms); + } + + if (gst_query_get_n_allocation_pools (query) > 0) + gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL); + + /* now store */ + result = + gst_aggregator_set_allocation (self, pool, allocator, ¶ms, query); + + return result; + + /* Errors */ +no_decide_allocation: + { + GST_WARNING_OBJECT (self, "Failed to decide allocation"); + gst_query_unref (query); + + return result; + } + +} + +/* WITH SRC_LOCK held */ +static GstFlowReturn +gst_aggregator_update_src_caps (GstAggregator * self) +{ + GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self); + GstCaps *downstream_caps, *template_caps, *caps = NULL; + GstFlowReturn ret = GST_FLOW_OK; + + template_caps = gst_pad_get_pad_template_caps (self->srcpad); + downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps); + + if (gst_caps_is_empty (downstream_caps)) { + GST_INFO_OBJECT (self, "Downstream caps (%" + GST_PTR_FORMAT ") not compatible with pad template caps (%" + GST_PTR_FORMAT ")", downstream_caps, template_caps); + ret = GST_FLOW_NOT_NEGOTIATED; + goto done; + } + + g_assert (agg_klass->update_src_caps); + GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT, + downstream_caps); + ret = agg_klass->update_src_caps (self, downstream_caps, &caps); + if (ret < GST_FLOW_OK) { + GST_WARNING_OBJECT (self, "Subclass failed to update provided caps"); + goto done; + } + if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) { + ret = GST_FLOW_NOT_NEGOTIATED; + goto done; + } + GST_DEBUG_OBJECT (self, " to %" GST_PTR_FORMAT, caps); + +#ifdef GST_ENABLE_EXTRA_CHECKS + if (!gst_caps_is_subset (caps, template_caps)) { + GstCaps *intersection; + + GST_ERROR_OBJECT (self, + "update_src_caps returned caps %" GST_PTR_FORMAT + " which are not a real subset of the template caps %" + GST_PTR_FORMAT, caps, template_caps); + g_warning ("%s: update_src_caps returned caps which are not a real " + "subset of the filter caps", GST_ELEMENT_NAME (self)); + + intersection = + gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST); + gst_caps_unref (caps); + caps = intersection; + } +#endif + + if (gst_caps_is_any (caps)) { + goto done; + } + + if (!gst_caps_is_fixed (caps)) { + g_assert (agg_klass->fixate_src_caps); + + GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps); + if (!(caps = agg_klass->fixate_src_caps (self, caps))) { + GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps"); + ret = GST_FLOW_NOT_NEGOTIATED; + goto done; + } + GST_DEBUG_OBJECT (self, " to %" GST_PTR_FORMAT, caps); + } + + if (agg_klass->negotiated_src_caps) { + if (!agg_klass->negotiated_src_caps (self, caps)) { + GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps"); + ret = GST_FLOW_NOT_NEGOTIATED; + goto done; + } + } + + gst_aggregator_set_src_caps (self, caps); + + if (!gst_aggregator_do_allocation (self, caps)) { + GST_WARNING_OBJECT (self, "Allocation negotiation failed"); + ret = GST_FLOW_NOT_NEGOTIATED; + } + +done: + gst_caps_unref (downstream_caps); + gst_caps_unref (template_caps); + + if (caps) + gst_caps_unref (caps); + + return ret; +} + static void gst_aggregator_aggregate_func (GstAggregator * self) { @@ -643,30 +1050,64 @@ gst_aggregator_aggregate_func (GstAggregator * self) GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && priv->running) { - GstFlowReturn flow_return; + GstFlowReturn flow_return = GST_FLOW_OK; + gboolean processed_event = FALSE; + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_do_events_and_queries, NULL); + + /* Ensure we have buffers ready (either in clipped_buffer or at the head of + * the queue */ if (!gst_aggregator_wait_and_check (self, &timeout)) continue; - GST_TRACE_OBJECT (self, "Actually aggregating!"); + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), + gst_aggregator_do_events_and_queries, &processed_event); + + if (processed_event) + continue; + + if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) { + flow_return = gst_aggregator_update_src_caps (self); + if (flow_return != GST_FLOW_OK) + gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self)); + } + + if (timeout || flow_return >= GST_FLOW_OK) { + GST_TRACE_OBJECT (self, "Actually aggregating!"); + flow_return = klass->aggregate (self, timeout); + } - flow_return = klass->aggregate (self, timeout); + if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA) + continue; GST_OBJECT_LOCK (self); - if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) - priv->flow_return = GST_FLOW_OK; - else - priv->flow_return = flow_return; + if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) { + /* We don't want to set the pads to flushing, but we want to + * stop the thread, so just break here */ + GST_OBJECT_UNLOCK (self); + break; + } GST_OBJECT_UNLOCK (self); - if (flow_return == GST_FLOW_EOS) { + if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) { gst_aggregator_push_eos (self); } GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return)); - if (flow_return != GST_FLOW_OK) + if (flow_return != GST_FLOW_OK) { + GList *item; + + GST_OBJECT_LOCK (self); + for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + + gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE); + } + GST_OBJECT_UNLOCK (self); break; + } } /* Pause the task here, the only ways to get here are: @@ -685,12 +1126,12 @@ gst_aggregator_start (GstAggregator * self) GstAggregatorClass *klass; gboolean result; - self->priv->running = TRUE; self->priv->send_stream_start = TRUE; self->priv->send_segment = TRUE; self->priv->send_eos = TRUE; self->priv->srccaps = NULL; - self->priv->flow_return = GST_FLOW_OK; + + gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL); klass = GST_AGGREGATOR_GET_CLASS (self); @@ -794,10 +1235,7 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GstAggregatorPrivate *priv = self->priv; GstAggregatorPadPrivate *padpriv = aggpad->priv; - g_atomic_int_set (&aggpad->priv->flushing, TRUE); - - /* Remove pad buffer and wake up the streaming thread */ - gst_aggregator_pad_drop_buffer (aggpad); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE); PAD_FLUSH_LOCK (aggpad); PAD_LOCK (aggpad); @@ -818,7 +1256,6 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GST_INFO_OBJECT (self, "Flushing, pausing srcpad task"); gst_aggregator_stop_srcpad_task (self, event); - priv->flow_return = GST_FLOW_OK; GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking"); GST_PAD_STREAM_LOCK (self->srcpad); @@ -833,10 +1270,46 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad, gst_event_unref (event); } PAD_FLUSH_UNLOCK (aggpad); +} + +/* Must be called with the the PAD_LOCK held */ +static void +update_time_level (GstAggregatorPad * aggpad, gboolean head) +{ + GstAggregatorPadPrivate *priv = aggpad->priv; + + if (head) { + if (GST_CLOCK_TIME_IS_VALID (priv->head_position) && + priv->head_segment.format == GST_FORMAT_TIME) + priv->head_time = gst_segment_to_running_time (&priv->head_segment, + GST_FORMAT_TIME, priv->head_position); + else + priv->head_time = GST_CLOCK_TIME_NONE; + + if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time)) + priv->tail_time = priv->head_time; + } else { + if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) && + aggpad->segment.format == GST_FORMAT_TIME) + priv->tail_time = gst_segment_to_running_time (&aggpad->segment, + GST_FORMAT_TIME, priv->tail_position); + else + priv->tail_time = priv->head_time; + } + + if (priv->head_time == GST_CLOCK_TIME_NONE || + priv->tail_time == GST_CLOCK_TIME_NONE) { + priv->time_level = 0; + return; + } - gst_aggregator_pad_drop_buffer (aggpad); + if (priv->tail_time > priv->head_time) + priv->time_level = 0; + else + priv->time_level = priv->head_time - priv->tail_time; } + /* GstAggregator vmethods default implementations */ static gboolean gst_aggregator_default_sink_event (GstAggregator * self, @@ -846,6 +1319,8 @@ gst_aggregator_default_sink_event (GstAggregator * self, GstPad *pad = GST_PAD (aggpad); GstAggregatorPrivate *priv = self->priv; + GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event); + switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: { @@ -856,8 +1331,6 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_FLUSH_STOP: { - GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP"); - gst_aggregator_pad_flush (aggpad, self); GST_OBJECT_LOCK (self); if (priv->flush_seeking) { @@ -889,30 +1362,27 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_EOS: { - GST_DEBUG_OBJECT (aggpad, "EOS"); - - /* We still have a buffer, and we don't want the subclass to have to - * check for it. Mark pending_eos, eos will be set when steal_buffer is - * called - */ SRC_LOCK (self); PAD_LOCK (aggpad); - if (!aggpad->priv->buffer) { - aggpad->priv->eos = TRUE; - } else { - aggpad->priv->pending_eos = TRUE; - } + g_assert (aggpad->priv->num_buffers == 0); + aggpad->priv->eos = TRUE; PAD_UNLOCK (aggpad); - SRC_BROADCAST (self); SRC_UNLOCK (self); goto eat; } case GST_EVENT_SEGMENT: { + PAD_LOCK (aggpad); GST_OBJECT_LOCK (aggpad); gst_event_copy_segment (event, &aggpad->segment); + /* We've got a new segment, tail_position is now meaningless + * and may interfere with the time_level calculation + */ + aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; + update_time_level (aggpad, FALSE); GST_OBJECT_UNLOCK (aggpad); + PAD_UNLOCK (aggpad); GST_OBJECT_LOCK (self); self->priv->seqnum = gst_event_get_seqnum (event); @@ -925,25 +1395,52 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_GAP: { - /* FIXME: need API to handle GAP events properly */ - GST_FIXME_OBJECT (self, "implement support for GAP events"); - /* don't forward GAP events downstream */ - goto eat; - } - case GST_EVENT_TAG: - { - GstTagList *tags; + GstClockTime pts, endpts; + GstClockTime duration; + GstBuffer *gapbuf; - gst_event_parse_tag (event, &tags); + gst_event_parse_gap (event, &pts, &duration); + gapbuf = gst_buffer_new (); - if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) { - gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE); - gst_event_unref (event); - event = NULL; + if (GST_CLOCK_TIME_IS_VALID (duration)) + endpts = pts + duration; + else + endpts = GST_CLOCK_TIME_NONE; + + GST_OBJECT_LOCK (aggpad); + res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts, + &pts, &endpts); + GST_OBJECT_UNLOCK (aggpad); + + if (!res) { + GST_WARNING_OBJECT (self, "GAP event outside segment, dropping"); goto eat; } - break; + + if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts)) + duration = endpts - pts; + else + duration = GST_CLOCK_TIME_NONE; + + GST_BUFFER_PTS (gapbuf) = pts; + GST_BUFFER_DURATION (gapbuf) = duration; + GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP); + GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE); + + /* Remove GAP event so we can replace it with the buffer */ + if (g_queue_peek_tail (&aggpad->priv->data) == event) + gst_event_unref (g_queue_pop_tail (&aggpad->priv->data)); + + if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) != + GST_FLOW_OK) { + GST_WARNING_OBJECT (self, "Failed to chain gap buffer"); + res = FALSE; + } + + goto eat; } + case GST_EVENT_TAG: + goto eat; default: { break; @@ -961,11 +1458,19 @@ eat: return res; } -static inline gboolean -gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad, - gpointer unused_udata) +static gboolean +gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data) { - gst_aggregator_pad_flush (pad, self); + GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad); + GstAggregator *agg = GST_AGGREGATOR_CAST (self); + + gst_aggregator_pad_flush (pad, agg); + + PAD_LOCK (pad); + pad->priv->flow_return = GST_FLOW_FLUSHING; + pad->priv->negotiated = FALSE; + PAD_BROADCAST_EVENT (pad); + PAD_UNLOCK (pad); return TRUE; } @@ -978,7 +1483,9 @@ gst_aggregator_stop (GstAggregator * agg) gst_aggregator_reset_flow_values (agg); - gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL); + /* Application needs to make sure no pads are added while it shuts us down */ + gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg), + gst_aggregator_stop_pad, NULL); klass = GST_AGGREGATOR_GET_CLASS (agg); @@ -987,10 +1494,16 @@ gst_aggregator_stop (GstAggregator * agg) else result = TRUE; + agg->priv->has_peer_latency = FALSE; + agg->priv->peer_latency_live = FALSE; + agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0; + if (agg->priv->tags) gst_tag_list_unref (agg->priv->tags); agg->priv->tags = NULL; + gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL); + return result; } @@ -1046,56 +1559,85 @@ static void gst_aggregator_release_pad (GstElement * element, GstPad * pad) { GstAggregator *self = GST_AGGREGATOR (element); - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GST_INFO_OBJECT (pad, "Removing pad"); SRC_LOCK (self); - g_atomic_int_set (&aggpad->priv->flushing, TRUE); - gst_aggregator_pad_drop_buffer (aggpad); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); gst_element_remove_pad (element, pad); + self->priv->has_peer_latency = FALSE; SRC_BROADCAST (self); SRC_UNLOCK (self); } -static GstPad * -gst_aggregator_request_new_pad (GstElement * element, +static GstAggregatorPad * +gst_aggregator_default_create_new_pad (GstAggregator * self, GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps) { - GstAggregator *self; GstAggregatorPad *agg_pad; + GstAggregatorPrivate *priv = self->priv; + gint serial = 0; + gchar *name = NULL; + GType pad_type = + GST_PAD_TEMPLATE_GTYPE (templ) == + G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ); - GstElementClass *klass = GST_ELEMENT_GET_CLASS (element); - GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv; + if (templ->direction != GST_PAD_SINK) + goto not_sink; - self = GST_AGGREGATOR (element); + if (templ->presence != GST_PAD_REQUEST) + goto not_request; - if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) { - gint serial = 0; - gchar *name = NULL; + GST_OBJECT_LOCK (self); + if (req_name == NULL || strlen (req_name) < 6 + || !g_str_has_prefix (req_name, "sink_")) { + /* no name given when requesting the pad, use next available int */ + serial = ++priv->max_padserial; + } else { + /* parse serial number from requested padname */ + serial = g_ascii_strtoull (&req_name[5], NULL, 10); + if (serial > priv->max_padserial) + priv->max_padserial = serial; + } - GST_OBJECT_LOCK (element); - if (req_name == NULL || strlen (req_name) < 6 - || !g_str_has_prefix (req_name, "sink_")) { - /* no name given when requesting the pad, use next available int */ - priv->padcount++; - } else { - /* parse serial number from requested padname */ - serial = g_ascii_strtoull (&req_name[5], NULL, 10); - if (serial >= priv->padcount) - priv->padcount = serial; - } + name = g_strdup_printf ("sink_%u", serial); + agg_pad = g_object_new (pad_type, + "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); + g_free (name); - name = g_strdup_printf ("sink_%u", priv->padcount); - agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type, - "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); - g_free (name); + GST_OBJECT_UNLOCK (self); - GST_OBJECT_UNLOCK (element); + return agg_pad; - } else { + /* errors */ +not_sink: + { + GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad"); + return NULL; + } +not_request: + { + GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad"); + return NULL; + } +} + +static GstPad * +gst_aggregator_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps) +{ + GstAggregator *self; + GstAggregatorPad *agg_pad; + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element); + GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv; + + self = GST_AGGREGATOR (element); + + agg_pad = klass->create_new_pad (self, templ, req_name, caps); + if (!agg_pad) { + GST_ERROR_OBJECT (element, "Couldn't create new pad"); return NULL; } @@ -1110,189 +1652,120 @@ gst_aggregator_request_new_pad (GstElement * element, return GST_PAD (agg_pad); } -typedef struct -{ - GstClockTime min, max; - gboolean live; -} LatencyData; +/* Must be called with SRC_LOCK held */ static gboolean -query_upstream_latency_fold (const GValue * item, GValue * ret, - gpointer user_data) +gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) { - GstPad *pad = g_value_get_object (item); - LatencyData *data = user_data; - GstClockTime min, max; - GstQuery *query; - gboolean live, res; + gboolean query_ret, live; + GstClockTime our_latency, min, max; - query = gst_query_new_latency (); - res = gst_pad_peer_query (GST_PAD_CAST (pad), query); + query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query); - if (res) { - gst_query_parse_latency (query, &live, &min, &max); - - GST_LOG_OBJECT (pad, "got latency live:%s min:%" G_GINT64_FORMAT - " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max); - - if (live) { - if (min > data->min) - data->min = min; - - if (data->max == GST_CLOCK_TIME_NONE) - data->max = max; - else if (max < data->max) - data->max = max; - - data->live = TRUE; - } - } else { - GST_LOG_OBJECT (pad, "latency query failed"); - g_value_set_boolean (ret, FALSE); - } - - gst_query_unref (query); - - return TRUE; -} - -static gboolean -gst_aggregator_query_latency (GstAggregator * self, GstQuery * query) -{ - GstIterator *it; - GstIteratorResult res; - GValue ret = G_VALUE_INIT; - gboolean query_ret; - LatencyData data; - GstClockTime our_latency; - - it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (self)); - g_value_init (&ret, G_TYPE_BOOLEAN); - -retry: - data.min = 0; - data.max = GST_CLOCK_TIME_NONE; - data.live = FALSE; - g_value_set_boolean (&ret, TRUE); - - /* query upstream's latency */ - res = gst_iterator_fold (it, query_upstream_latency_fold, &ret, &data); - switch (res) { - case GST_ITERATOR_OK: - g_assert_not_reached (); - break; - case GST_ITERATOR_DONE: - break; - case GST_ITERATOR_ERROR: - g_value_set_boolean (&ret, FALSE); - break; - case GST_ITERATOR_RESYNC: - gst_iterator_resync (it); - goto retry; - default: - g_assert_not_reached (); - break; - } - gst_iterator_free (it); - query_ret = g_value_get_boolean (&ret); if (!query_ret) { GST_WARNING_OBJECT (self, "Latency query failed"); return FALSE; } - SRC_LOCK (self); - our_latency = self->priv->latency; + gst_query_parse_latency (query, &live, &min, &max); - if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (data.min))) { - GST_WARNING_OBJECT (self, "Invalid minimum latency, using 0"); - data.min = 0; + if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) { + GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT + ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min)); + return FALSE; } - if (G_UNLIKELY (data.min > data.max)) { - GST_WARNING_OBJECT (self, "Minimum latency is greater than maximum latency " - "(%" G_GINT64_FORMAT " > %" G_GINT64_FORMAT "). " - "Clamping it at the maximum latency", data.min, data.max); - data.min = data.max; + if (min > max && GST_CLOCK_TIME_IS_VALID (max)) { + GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL), + ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %" + GST_TIME_FORMAT ". Add queues or other buffering elements.", + GST_TIME_ARGS (max), GST_TIME_ARGS (min))); + return FALSE; } - self->priv->latency_live = data.live; - self->priv->latency_min = data.min; - self->priv->latency_max = data.max; + our_latency = self->priv->latency; + + self->priv->peer_latency_live = live; + self->priv->peer_latency_min = min; + self->priv->peer_latency_max = max; + self->priv->has_peer_latency = TRUE; /* add our own */ - data.min += our_latency; - data.min += self->priv->sub_latency_min; + min += our_latency; + min += self->priv->sub_latency_min; if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max) - && GST_CLOCK_TIME_IS_VALID (data.max)) - data.max += self->priv->sub_latency_max; + && GST_CLOCK_TIME_IS_VALID (max)) + max += self->priv->sub_latency_max + our_latency; else - data.max = GST_CLOCK_TIME_NONE; - - if (data.live && data.min > data.max) { - GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, - ("%s", "Latency too big"), - ("The requested latency value is too big for the current pipeline. " - "Limiting to %" G_GINT64_FORMAT, data.max)); - data.min = data.max; - /* FIXME: This could in theory become negative, but in - * that case all is lost anyway */ - self->priv->latency -= data.min - data.max; - /* FIXME: shouldn't we g_object_notify() the change here? */ - } + max = GST_CLOCK_TIME_NONE; SRC_BROADCAST (self); - SRC_UNLOCK (self); GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT - " max:%" G_GINT64_FORMAT, data.live ? "true" : "false", data.min, - data.max); + " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max); - gst_query_set_latency (query, data.live, data.min, data.max); + gst_query_set_latency (query, live, min, max); return query_ret; } +/* + * MUST be called with the src_lock held. + * + * See gst_aggregator_get_latency() for doc + */ +static GstClockTime +gst_aggregator_get_latency_unlocked (GstAggregator * self) +{ + GstClockTime latency; + + g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0); + + if (!self->priv->has_peer_latency) { + GstQuery *query = gst_query_new_latency (); + gboolean ret; + + ret = gst_aggregator_query_latency_unlocked (self, query); + gst_query_unref (query); + if (!ret) + return GST_CLOCK_TIME_NONE; + } + + if (!self->priv->has_peer_latency || !self->priv->peer_latency_live) + return GST_CLOCK_TIME_NONE; + + /* latency_min is never GST_CLOCK_TIME_NONE by construction */ + latency = self->priv->peer_latency_min; + + /* add our own */ + latency += self->priv->latency; + latency += self->priv->sub_latency_min; + + return latency; +} + /** - * gst_aggregator_get_latency_unlocked: + * gst_aggregator_get_latency: * @self: a #GstAggregator - * @live: (out) (allow-none): whether @self is live - * @min_latency: (out) (allow-none): the configured minimum latency of @self - * @max_latency: (out) (allow-none): the configured maximum latency of @self * - * Retreives the latency values reported by @self in response to the latency - * query. + * Retrieves the latency values reported by @self in response to the latency + * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element + * will not wait for the clock. * * Typically only called by subclasses. * - * MUST be called with the src_lock held. + * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync */ -void -gst_aggregator_get_latency_unlocked (GstAggregator * self, gboolean * live, - GstClockTime * min_latency, GstClockTime * max_latency) +GstClockTime +gst_aggregator_get_latency (GstAggregator * self) { - GstClockTime min, max; - - g_return_if_fail (GST_IS_AGGREGATOR (self)); + GstClockTime ret; - /* latency_min is never GST_CLOCK_TIME_NONE by construction */ - min = self->priv->latency_min; - max = self->priv->latency_max; - - /* add our own */ - min += self->priv->latency; - min += self->priv->sub_latency_min; - if (GST_CLOCK_TIME_IS_VALID (max) - && GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)) - max += self->priv->sub_latency_max; - else - max = GST_CLOCK_TIME_NONE; + SRC_LOCK (self); + ret = gst_aggregator_get_latency_unlocked (self); + SRC_UNLOCK (self); - if (live) - *live = self->priv->latency_live; - if (min_latency) - *min_latency = min; - if (max_latency) - *max_latency = max; + return ret; } static gboolean @@ -1316,6 +1789,7 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event) gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, stop_type, stop, NULL); self->priv->seqnum = gst_event_get_seqnum (event); + self->priv->first_buffer = FALSE; GST_OBJECT_UNLOCK (self); GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event); @@ -1343,17 +1817,17 @@ gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query) gst_query_set_seeking (query, format, FALSE, 0, -1); res = TRUE; - goto discard; + break; } case GST_QUERY_LATENCY: - return gst_aggregator_query_latency (self, query); - default: + SRC_LOCK (self); + res = gst_aggregator_query_latency_unlocked (self, query); + SRC_UNLOCK (self); break; + default: + return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query); } - return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query); - -discard: return res; } @@ -1366,18 +1840,22 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); if (peer) { - ret = gst_pad_send_event (peer, gst_event_ref (evdata->event)); - GST_DEBUG_OBJECT (pad, "return of event push is %d", ret); - gst_object_unref (peer); + if (evdata->only_to_active_pads && aggpad->priv->first_buffer) { + GST_DEBUG_OBJECT (pad, "not sending event to inactive pad"); + ret = TRUE; + } else { + ret = gst_pad_send_event (peer, gst_event_ref (evdata->event)); + GST_DEBUG_OBJECT (pad, "return of event push is %d", ret); + gst_object_unref (peer); + } } if (ret == FALSE) { - if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) - GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event); - if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) { GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME); + GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event); + if (gst_pad_query (peer, seeking)) { gboolean seekable; @@ -1412,21 +1890,17 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) return FALSE; } -static EventData +static void gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, - GstEvent * event, gboolean flush) + EventData * evdata) { - EventData evdata; - - evdata.event = event; - evdata.result = TRUE; - evdata.flush = flush; - evdata.one_actually_seeked = FALSE; + evdata->result = TRUE; + evdata->one_actually_seeked = FALSE; /* We first need to set all pads as flushing in a first pass * as flush_start flush_stop is sometimes sent synchronously * while we send the seek event */ - if (flush) { + if (evdata->flush) { GList *l; GST_OBJECT_LOCK (self); @@ -1441,11 +1915,9 @@ gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self, GST_OBJECT_UNLOCK (self); } - gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, &evdata); - - gst_event_unref (event); + gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata); - return evdata; + gst_event_unref (evdata->event); } static gboolean @@ -1457,7 +1929,7 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) GstSeekType start_type, stop_type; gint64 start, stop; gboolean flush; - EventData evdata; + EventData evdata = { 0, }; GstAggregatorPrivate *priv = self->priv; gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type, @@ -1475,10 +1947,16 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start, stop_type, stop, NULL); + + /* Seeking sets a position */ + self->priv->first_buffer = FALSE; GST_OBJECT_UNLOCK (self); /* forward the seek upstream */ - evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush); + evdata.event = event; + evdata.flush = flush; + evdata.only_to_active_pads = FALSE; + gst_aggregator_forward_event_to_all_sinkpads (self, &evdata); event = NULL; if (!evdata.result || !evdata.one_actually_seeked) { @@ -1496,36 +1974,28 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event) static gboolean gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event) { - EventData evdata; - gboolean res = TRUE; + EventData evdata = { 0, }; switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEEK: - { - gst_event_ref (event); - res = gst_aggregator_do_seek (self, event); - gst_event_unref (event); - event = NULL; - goto done; - } + /* _do_seek() unrefs the event. */ + return gst_aggregator_do_seek (self, event); case GST_EVENT_NAVIGATION: - { /* navigation is rather pointless. */ - res = FALSE; gst_event_unref (event); - goto done; - } + return FALSE; default: - { break; - } } - evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE); - res = evdata.result; - -done: - return res; + /* Don't forward QOS events to pads that had no active buffer yet. Otherwise + * they will receive a QOS event that has earliest_time=0 (because we can't + * have negative timestamps), and consider their buffer as too late */ + evdata.event = event; + evdata.flush = FALSE; + evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS; + gst_aggregator_forward_event_to_all_sinkpads (self, &evdata); + return evdata.result; } static gboolean @@ -1575,18 +2045,57 @@ gst_aggregator_src_pad_activate_mode_func (GstPad * pad, } } - /* deactivating */ - GST_INFO_OBJECT (self, "Deactivating srcpad"); - gst_aggregator_stop_srcpad_task (self, FALSE); + /* deactivating */ + GST_INFO_OBJECT (self, "Deactivating srcpad"); + gst_aggregator_stop_srcpad_task (self, FALSE); + + return TRUE; +} + +static gboolean +gst_aggregator_default_sink_query (GstAggregator * self, + GstAggregatorPad * aggpad, GstQuery * query) +{ + GstPad *pad = GST_PAD (aggpad); + + if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) { + GstQuery *decide_query = NULL; + GstAggregatorClass *agg_class; + gboolean ret; + + GST_OBJECT_LOCK (self); + PAD_LOCK (aggpad); + if (G_UNLIKELY (!aggpad->priv->negotiated)) { + GST_DEBUG_OBJECT (self, + "not negotiated yet, can't answer ALLOCATION query"); + PAD_UNLOCK (aggpad); + GST_OBJECT_UNLOCK (self); + + return FALSE; + } + + if ((decide_query = self->priv->allocation_query)) + gst_query_ref (decide_query); + PAD_UNLOCK (aggpad); + GST_OBJECT_UNLOCK (self); - return TRUE; -} + GST_DEBUG_OBJECT (self, + "calling propose allocation with query %" GST_PTR_FORMAT, decide_query); -static gboolean -gst_aggregator_default_sink_query (GstAggregator * self, - GstAggregatorPad * aggpad, GstQuery * query) -{ - GstPad *pad = GST_PAD (aggpad); + agg_class = GST_AGGREGATOR_GET_CLASS (self); + + /* pass the query to the propose_allocation vmethod if any */ + if (agg_class->propose_allocation) + ret = agg_class->propose_allocation (self, aggpad, decide_query, query); + else + ret = FALSE; + + if (decide_query) + gst_query_unref (decide_query); + + GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query); + return ret; + } return gst_pad_query_default (pad, GST_OBJECT (self), query); } @@ -1605,51 +2114,46 @@ gst_aggregator_finalize (GObject * object) /* * gst_aggregator_set_latency_property: * @agg: a #GstAggregator - * @latency: the new latency value. + * @latency: the new latency value (in nanoseconds). * * Sets the new latency value to @latency. This value is used to limit the * amount of time a pad waits for data to appear before considering the pad * as unresponsive. */ static void -gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) +gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency) { gboolean changed; - GstClockTime min, max; g_return_if_fail (GST_IS_AGGREGATOR (self)); g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency)); SRC_LOCK (self); - if (self->priv->latency_live) { - min = self->priv->latency_min; - max = self->priv->latency_max; - /* add our own */ - min += latency; - min += self->priv->sub_latency_min; - if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max) - && GST_CLOCK_TIME_IS_VALID (max)) - max += self->priv->sub_latency_max; - else - max = GST_CLOCK_TIME_NONE; - - if (GST_CLOCK_TIME_IS_VALID (max) && min > max) { - GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, - ("%s", "Latency too big"), - ("The requested latency value is too big for the latency in the " - "current pipeline. Limiting to %" G_GINT64_FORMAT, max)); - /* FIXME: This could in theory become negative, but in - * that case all is lost anyway */ - latency -= min - max; - /* FIXME: shouldn't we g_object_notify() the change here? */ + changed = (self->priv->latency != latency); + + if (changed) { + GList *item; + + GST_OBJECT_LOCK (self); + /* First lock all the pads */ + for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + PAD_LOCK (aggpad); } - } - changed = (self->priv->latency != latency); - self->priv->latency = latency; + self->priv->latency = latency; - if (changed) SRC_BROADCAST (self); + + /* Now wake up the pads */ + for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) { + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data); + PAD_BROADCAST_EVENT (aggpad); + PAD_UNLOCK (aggpad); + } + GST_OBJECT_UNLOCK (self); + } + SRC_UNLOCK (self); if (changed) @@ -1664,16 +2168,16 @@ gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) * Gets the latency value. See gst_aggregator_set_latency for * more details. * - * Returns: The time in nanoseconds to wait for data to arrive on a sink pad + * Returns: The time in nanoseconds to wait for data to arrive on a sink pad * before a pad is deemed unresponsive. A value of -1 means an * unlimited time. */ -static gint64 +static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg) { - gint64 res; + GstClockTime res; - g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1); + g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE); GST_OBJECT_LOCK (agg); res = agg->priv->latency; @@ -1690,7 +2194,13 @@ gst_aggregator_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_LATENCY: - gst_aggregator_set_latency_property (agg, g_value_get_int64 (value)); + gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value)); + break; + case PROP_START_TIME_SELECTION: + agg->priv->start_time_selection = g_value_get_enum (value); + break; + case PROP_START_TIME: + agg->priv->start_time = g_value_get_uint64 (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -1706,7 +2216,13 @@ gst_aggregator_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_LATENCY: - g_value_set_int64 (value, gst_aggregator_get_latency_property (agg)); + g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg)); + break; + case PROP_START_TIME_SELECTION: + g_value_set_enum (value, agg->priv->start_time_selection); + break; + case PROP_START_TIME: + g_value_set_uint64 (value, agg->priv->start_time); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -1727,7 +2243,7 @@ gst_aggregator_class_init (GstAggregatorClass * klass) GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator", GST_DEBUG_FG_MAGENTA, "GstAggregator"); - klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD; + klass->finish_buffer = gst_aggregator_default_finish_buffer; klass->sink_event = gst_aggregator_default_sink_event; klass->sink_query = gst_aggregator_default_sink_query; @@ -1735,6 +2251,11 @@ gst_aggregator_class_init (GstAggregatorClass * klass) klass->src_event = gst_aggregator_default_src_event; klass->src_query = gst_aggregator_default_src_query; + klass->create_new_pad = gst_aggregator_default_create_new_pad; + klass->update_src_caps = gst_aggregator_default_update_src_caps; + klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps; + klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps; + gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad); gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event); @@ -1748,14 +2269,24 @@ gst_aggregator_class_init (GstAggregatorClass * klass) gobject_class->finalize = gst_aggregator_finalize; g_object_class_install_property (gobject_class, PROP_LATENCY, - g_param_spec_int64 ("latency", "Buffer latency", + g_param_spec_uint64 ("latency", "Buffer latency", "Additional latency in live mode to allow upstream " "to take longer to produce buffers for the current " - "position", 0, - (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1), + "position (in nanoseconds)", 0, G_MAXUINT64, DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad); + g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION, + g_param_spec_enum ("start-time-selection", "Start Time Selection", + "Decides which start time is output", + gst_aggregator_start_time_selection_get_type (), + DEFAULT_START_TIME_SELECTION, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_START_TIME, + g_param_spec_uint64 ("start-time", "Start Time", + "Start time to use if start-time-selection=set", 0, + G_MAXUINT64, + DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static void @@ -1776,12 +2307,13 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src"); g_return_if_fail (pad_template != NULL); - priv->padcount = -1; + priv->max_padserial = -1; priv->tags_changed = FALSE; - self->priv->latency_live = FALSE; - self->priv->latency_min = self->priv->sub_latency_min = 0; - self->priv->latency_max = self->priv->sub_latency_max = 0; + self->priv->peer_latency_live = FALSE; + self->priv->peer_latency_min = self->priv->sub_latency_min = 0; + self->priv->peer_latency_max = self->priv->sub_latency_max = 0; + self->priv->has_peer_latency = FALSE; gst_aggregator_reset_flow_values (self); self->srcpad = gst_pad_new_from_template (pad_template, "src"); @@ -1796,6 +2328,8 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) gst_element_add_pad (GST_ELEMENT (self), self->srcpad); self->priv->latency = DEFAULT_LATENCY; + self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION; + self->priv->start_time = DEFAULT_START_TIME; g_mutex_init (&self->priv->src_lock); g_cond_init (&self->priv->src_cond); @@ -1829,167 +2363,337 @@ gst_aggregator_get_type (void) return type; } +/* Must be called with SRC lock and PAD lock held */ +static gboolean +gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad) +{ + /* Empty queue always has space */ + if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL) + return TRUE; + + /* We also want at least two buffers, one is being processed and one is ready + * for the next iteration when we operate in live mode. */ + if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2) + return TRUE; + + /* zero latency, if there is a buffer, it's full */ + if (self->priv->latency == 0) + return FALSE; + + /* Allow no more buffers than the latency */ + return (aggpad->priv->time_level <= self->priv->latency); +} + +/* Must be called with the PAD_LOCK held */ +static void +apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) +{ + GstClockTime timestamp; + + if (GST_BUFFER_DTS_IS_VALID (buffer)) + timestamp = GST_BUFFER_DTS (buffer); + else + timestamp = GST_BUFFER_PTS (buffer); + + if (timestamp == GST_CLOCK_TIME_NONE) { + if (head) + timestamp = aggpad->priv->head_position; + else + timestamp = aggpad->priv->tail_position; + } + + /* add duration */ + if (GST_BUFFER_DURATION_IS_VALID (buffer)) + timestamp += GST_BUFFER_DURATION (buffer); + + if (head) + aggpad->priv->head_position = timestamp; + else + aggpad->priv->tail_position = timestamp; + + update_time_level (aggpad, head); +} + +/* + * Can be called either from the sinkpad's chain function or from the srcpad's + * thread in the case of a buffer synthetized from a GAP event. + * Because of this second case, FLUSH_LOCK can't be used here. + */ + static GstFlowReturn -gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) +gst_aggregator_pad_chain_internal (GstAggregator * self, + GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) { - GstBuffer *actual_buf = buffer; - GstAggregator *self = GST_AGGREGATOR (object); - GstAggregatorPrivate *priv = self->priv; - GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object); GstFlowReturn flow_return; + GstClockTime buf_pts; + GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad); GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer); - PAD_FLUSH_LOCK (aggpad); - - if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) + PAD_LOCK (aggpad); + flow_return = aggpad->priv->flow_return; + if (flow_return != GST_FLOW_OK) goto flushing; - PAD_LOCK (aggpad); - if (aggpad->priv->pending_eos == TRUE) - goto eos; + if (klass->skip_buffer && klass->skip_buffer (aggpad, self, buffer)) + goto skipped; + + PAD_UNLOCK (aggpad); + + buf_pts = GST_BUFFER_PTS (buffer); + + for (;;) { + SRC_LOCK (self); + GST_OBJECT_LOCK (self); + PAD_LOCK (aggpad); + + if (aggpad->priv->first_buffer) { + self->priv->has_peer_latency = FALSE; + aggpad->priv->first_buffer = FALSE; + } + + if ((gst_aggregator_pad_has_space (self, aggpad) || !head) + && aggpad->priv->flow_return == GST_FLOW_OK) { + if (head) + g_queue_push_head (&aggpad->priv->data, buffer); + else + g_queue_push_tail (&aggpad->priv->data, buffer); + apply_buffer (aggpad, buffer, head); + aggpad->priv->num_buffers++; + buffer = NULL; + SRC_BROADCAST (self); + break; + } - while (aggpad->priv->buffer - && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) { + flow_return = aggpad->priv->flow_return; + if (flow_return != GST_FLOW_OK) { + GST_OBJECT_UNLOCK (self); + SRC_UNLOCK (self); + goto flushing; + } GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); + GST_OBJECT_UNLOCK (self); + SRC_UNLOCK (self); PAD_WAIT_EVENT (aggpad); + + PAD_UNLOCK (aggpad); } - PAD_UNLOCK (aggpad); - if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) - goto flushing; + if (self->priv->first_buffer) { + GstClockTime start_time; + + switch (self->priv->start_time_selection) { + case GST_AGGREGATOR_START_TIME_SELECTION_ZERO: + default: + start_time = 0; + break; + case GST_AGGREGATOR_START_TIME_SELECTION_FIRST: + GST_OBJECT_LOCK (aggpad); + if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) { + start_time = buf_pts; + if (start_time != -1) { + start_time = MAX (start_time, aggpad->priv->head_segment.start); + start_time = + gst_segment_to_running_time (&aggpad->priv->head_segment, + GST_FORMAT_TIME, start_time); + } + } else { + start_time = 0; + GST_WARNING_OBJECT (aggpad, + "Ignoring request of selecting the first start time " + "as the segment is a %s segment instead of a time segment", + gst_format_get_name (aggpad->segment.format)); + } + GST_OBJECT_UNLOCK (aggpad); + break; + case GST_AGGREGATOR_START_TIME_SELECTION_SET: + start_time = self->priv->start_time; + if (start_time == -1) + start_time = 0; + break; + } - if (aggclass->clip) { - aggclass->clip (self, aggpad, buffer, &actual_buf); + if (start_time != -1) { + if (self->segment.position == -1) + self->segment.position = start_time; + else + self->segment.position = MIN (start_time, self->segment.position); + + GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT, + GST_TIME_ARGS (start_time)); + } } - SRC_LOCK (self); - PAD_LOCK (aggpad); - if (aggpad->priv->buffer) - gst_buffer_unref (aggpad->priv->buffer); - aggpad->priv->buffer = actual_buf; PAD_UNLOCK (aggpad); - PAD_FLUSH_UNLOCK (aggpad); - - SRC_BROADCAST (self); + GST_OBJECT_UNLOCK (self); SRC_UNLOCK (self); GST_DEBUG_OBJECT (aggpad, "Done chaining"); - GST_OBJECT_LOCK (self); - flow_return = priv->flow_return; - GST_OBJECT_UNLOCK (self); - return flow_return; flushing: - PAD_FLUSH_UNLOCK (aggpad); + PAD_UNLOCK (aggpad); - gst_buffer_unref (buffer); - GST_DEBUG_OBJECT (aggpad, "We are flushing"); + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer", + gst_flow_get_name (flow_return)); + if (buffer) + gst_buffer_unref (buffer); - return GST_FLOW_FLUSHING; + return flow_return; -eos: +skipped: PAD_UNLOCK (aggpad); - PAD_FLUSH_UNLOCK (aggpad); + GST_DEBUG_OBJECT (aggpad, "Skipped buffer %" GST_PTR_FORMAT, buffer); gst_buffer_unref (buffer); - GST_DEBUG_OBJECT (pad, "We are EOS already..."); - return GST_FLOW_EOS; + return GST_FLOW_OK; +} + +static GstFlowReturn +gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer) +{ + GstFlowReturn ret; + GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); + + PAD_FLUSH_LOCK (aggpad); + + ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object), + aggpad, buffer, TRUE); + + PAD_FLUSH_UNLOCK (aggpad); + + return ret; } static gboolean gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query) { + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); if (GST_QUERY_IS_SERIALIZED (query)) { + GstStructure *s; + gboolean ret = FALSE; + + SRC_LOCK (self); PAD_LOCK (aggpad); - if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) { - PAD_UNLOCK (aggpad); + if (aggpad->priv->flow_return != GST_FLOW_OK) { + SRC_UNLOCK (self); goto flushing; } - while (aggpad->priv->buffer - && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) { + g_queue_push_head (&aggpad->priv->data, query); + SRC_BROADCAST (self); + SRC_UNLOCK (self); + + while (!gst_aggregator_pad_queue_is_empty (aggpad) + && aggpad->priv->flow_return == GST_FLOW_OK) { GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); } - PAD_UNLOCK (aggpad); - if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) + s = gst_query_writable_structure (query); + if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret)) + gst_structure_remove_field (s, "gst-aggregator-retval"); + else + g_queue_remove (&aggpad->priv->data, query); + + if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; - } - return klass->sink_query (GST_AGGREGATOR (parent), - GST_AGGREGATOR_PAD (pad), query); + PAD_UNLOCK (aggpad); + + return ret; + } else { + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); + + return klass->sink_query (self, aggpad, query); + } flushing: - GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping query"); + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", + gst_flow_get_name (aggpad->priv->flow_return)); + PAD_UNLOCK (aggpad); + return FALSE; } -static gboolean +/* Queue serialized events and let the others go through directly. + * The queued events with be handled from the src-pad task in + * gst_aggregator_do_events_and_queries(). + */ +static GstFlowReturn gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GstEvent * event) { + GstFlowReturn ret = GST_FLOW_OK; + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); - GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS - && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) { + if (GST_EVENT_IS_SERIALIZED (event) + && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { + SRC_LOCK (self); PAD_LOCK (aggpad); - if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE - && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { - PAD_UNLOCK (aggpad); + if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; - } - while (aggpad->priv->buffer - && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) { - GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); - PAD_WAIT_EVENT (aggpad); + if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { + GST_OBJECT_LOCK (aggpad); + gst_event_copy_segment (event, &aggpad->priv->head_segment); + aggpad->priv->head_position = aggpad->priv->head_segment.position; + update_time_level (aggpad, TRUE); + GST_OBJECT_UNLOCK (aggpad); } + + GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event); + g_queue_push_head (&aggpad->priv->data, event); + SRC_BROADCAST (self); PAD_UNLOCK (aggpad); + SRC_UNLOCK (self); + } else { + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); - if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE - && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) - goto flushing; + if (!klass->sink_event (self, aggpad, event)) { + /* Copied from GstPad to convert boolean to a GstFlowReturn in + * the event handling func */ + ret = GST_FLOW_ERROR; + } } - return klass->sink_event (GST_AGGREGATOR (parent), - GST_AGGREGATOR_PAD (pad), event); + return ret; flushing: - GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping event"); + GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event", + gst_flow_get_name (aggpad->priv->flow_return)); + PAD_UNLOCK (aggpad); + SRC_UNLOCK (self); if (GST_EVENT_IS_STICKY (event)) gst_pad_store_sticky_event (pad, event); gst_event_unref (event); - return FALSE; + + return aggpad->priv->flow_return; } static gboolean gst_aggregator_pad_activate_mode_func (GstPad * pad, GstObject * parent, GstPadMode mode, gboolean active) { + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); if (active == FALSE) { - PAD_LOCK (aggpad); - g_atomic_int_set (&aggpad->priv->flushing, TRUE); - gst_buffer_replace (&aggpad->priv->buffer, NULL); - PAD_BROADCAST_EVENT (aggpad); - PAD_UNLOCK (aggpad); + SRC_LOCK (self); + gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE); + SRC_BROADCAST (self); + SRC_UNLOCK (self); } else { PAD_LOCK (aggpad); - g_atomic_int_set (&aggpad->priv->flushing, FALSE); + aggpad->priv->flow_return = GST_FLOW_OK; PAD_BROADCAST_EVENT (aggpad); PAD_UNLOCK (aggpad); } @@ -2009,8 +2713,8 @@ gst_aggregator_pad_constructed (GObject * object) gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain)); - gst_pad_set_event_function (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func)); + gst_pad_set_event_full_function_full (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL); gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func)); gst_pad_set_activatemode_function (pad, @@ -2034,7 +2738,7 @@ gst_aggregator_pad_dispose (GObject * object) { GstAggregatorPad *pad = (GstAggregatorPad *) object; - gst_aggregator_pad_drop_buffer (pad); + gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE); G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object); } @@ -2058,15 +2762,72 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD, GstAggregatorPadPrivate); - pad->priv->buffer = NULL; + g_queue_init (&pad->priv->data); g_cond_init (&pad->priv->event_cond); g_mutex_init (&pad->priv->flush_lock); g_mutex_init (&pad->priv->lock); + + gst_aggregator_pad_reset_unlocked (pad); + pad->priv->negotiated = FALSE; +} + +/* Must be called with the PAD_LOCK held */ +static void +gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad) +{ + pad->priv->num_buffers--; + GST_TRACE_OBJECT (pad, "Consuming buffer"); + PAD_BROADCAST_EVENT (pad); +} + +/* Must be called with the PAD_LOCK held */ +static void +gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) +{ + GstAggregator *self = NULL; + GstAggregatorClass *aggclass = NULL; + GstBuffer *buffer = NULL; + + while (pad->priv->clipped_buffer == NULL && + GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { + buffer = g_queue_pop_tail (&pad->priv->data); + + apply_buffer (pad, buffer, FALSE); + + /* We only take the parent here so that it's not taken if the buffer is + * already clipped or if the queue is empty. + */ + if (self == NULL) { + self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad))); + if (self == NULL) { + gst_buffer_unref (buffer); + return; + } + + aggclass = GST_AGGREGATOR_GET_CLASS (self); + } + + if (aggclass->clip) { + GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer); + + buffer = aggclass->clip (self, pad, buffer); + + if (buffer == NULL) { + gst_aggregator_pad_buffer_consumed (pad); + GST_TRACE_OBJECT (pad, "Clipping consumed the buffer"); + } + } + + pad->priv->clipped_buffer = buffer; + } + + if (self) + gst_object_unref (self); } /** - * gst_aggregator_pad_steal_buffer: + * gst_aggregator_pad_pop_buffer: * @pad: the pad to get buffer from * * Steal the ref to the buffer currently queued in @pad. @@ -2075,22 +2836,22 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) * queued. You should unref the buffer after usage. */ GstBuffer * -gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) +gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad) { - GstBuffer *buffer = NULL; + GstBuffer *buffer; PAD_LOCK (pad); - if (pad->priv->buffer) { - GST_TRACE_OBJECT (pad, "Consuming buffer"); - buffer = pad->priv->buffer; - pad->priv->buffer = NULL; - if (pad->priv->pending_eos) { - pad->priv->pending_eos = FALSE; - pad->priv->eos = TRUE; - } - PAD_BROADCAST_EVENT (pad); + + gst_aggregator_pad_clip_buffer_unlocked (pad); + + buffer = pad->priv->clipped_buffer; + + if (buffer) { + pad->priv->clipped_buffer = NULL; + gst_aggregator_pad_buffer_consumed (pad); GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); } + PAD_UNLOCK (pad); return buffer; @@ -2109,7 +2870,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) { GstBuffer *buf; - buf = gst_aggregator_pad_steal_buffer (pad); + buf = gst_aggregator_pad_pop_buffer (pad); if (buf == NULL) return FALSE; @@ -2119,7 +2880,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) } /** - * gst_aggregator_pad_get_buffer: + * gst_aggregator_pad_peek_buffer: * @pad: the pad to get buffer from * * Returns: (transfer full): A reference to the buffer in @pad or @@ -2127,18 +2888,30 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) * usage. */ GstBuffer * -gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) +gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad) { - GstBuffer *buffer = NULL; + GstBuffer *buffer; PAD_LOCK (pad); - if (pad->priv->buffer) - buffer = gst_buffer_ref (pad->priv->buffer); + + gst_aggregator_pad_clip_buffer_unlocked (pad); + + if (pad->priv->clipped_buffer) { + buffer = gst_buffer_ref (pad->priv->clipped_buffer); + } else { + buffer = NULL; + } PAD_UNLOCK (pad); return buffer; } +/** + * gst_aggregator_pad_is_eos: + * @pad: an aggregator pad + * + * Returns: %TRUE if the pad is EOS, otherwise %FALSE. + */ gboolean gst_aggregator_pad_is_eos (GstAggregatorPad * pad) { @@ -2151,7 +2924,8 @@ gst_aggregator_pad_is_eos (GstAggregatorPad * pad) return is_eos; } -/** +#if 0 +/* * gst_aggregator_merge_tags: * @self: a #GstAggregator * @tags: a #GstTagList to merge @@ -2185,6 +2959,7 @@ gst_aggregator_merge_tags (GstAggregator * self, self->priv->tags_changed = TRUE; GST_OBJECT_UNLOCK (self); } +#endif /** * gst_aggregator_set_latency: @@ -2225,3 +3000,53 @@ gst_aggregator_set_latency (GstAggregator * self, gst_message_new_latency (GST_OBJECT_CAST (self))); } } + +/** + * gst_aggregator_get_buffer_pool: + * @self: a #GstAggregator + * + * Returns: (transfer full): the instance of the #GstBufferPool used + * by @trans; free it after use it + */ +GstBufferPool * +gst_aggregator_get_buffer_pool (GstAggregator * self) +{ + GstBufferPool *pool; + + g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL); + + GST_OBJECT_LOCK (self); + pool = self->priv->pool; + if (pool) + gst_object_ref (pool); + GST_OBJECT_UNLOCK (self); + + return pool; +} + +/** + * gst_aggregator_get_allocator: + * @self: a #GstAggregator + * @allocator: (out) (allow-none) (transfer full): the #GstAllocator + * used + * @params: (out) (allow-none) (transfer full): the + * #GstAllocationParams of @allocator + * + * Lets #GstAggregator sub-classes get the memory @allocator + * acquired by the base class and its @params. + * + * Unref the @allocator after use it. + */ +void +gst_aggregator_get_allocator (GstAggregator * self, + GstAllocator ** allocator, GstAllocationParams * params) +{ + g_return_if_fail (GST_IS_AGGREGATOR (self)); + + if (allocator) + *allocator = self->priv->allocator ? + gst_object_ref (self->priv->allocator) : NULL; + + if (params) + *params = self->priv->allocation_params; +}