X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstaggregator.c;h=2bad1a10f42bf1c971dcee9763854338a4a606d9;hb=70d0945b3592d0fbcf6345cd9055cbebee619661;hp=26d48a3477187d3929c185bb5281cd0e2ef1c085;hpb=3db4239aab91cc7e769d314a93c5cd966fd285ff;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 26d48a3..2bad1a1 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -21,46 +21,40 @@ */ /** * SECTION: gstaggregator + * @title: GstAggregator * @short_description: manages a set of pads with the purpose of * aggregating their buffers. * @see_also: gstcollectpads for historical reasons. * * Manages a set of pads with the purpose of aggregating their buffers. * Control is given to the subclass when all pads have data. - * - * - * Base class for mixers and muxers. Subclasses should at least implement + * + * * Base class for mixers and muxers. Subclasses should at least implement * the #GstAggregatorClass.aggregate() virtual method. - * - * - * When data is queued on all pads, tha aggregate vmethod is called. - * - * - * One can peek at the data on any given GstAggregatorPad with the + * + * * When data is queued on all pads, the aggregate vmethod is called. + * + * * One can peek at the data on any given GstAggregatorPad with the * gst_aggregator_pad_get_buffer () method, and take ownership of it * with the gst_aggregator_pad_steal_buffer () method. When a buffer * has been taken with steal_buffer (), a new buffer can be queued * on that pad. - * - * - * If the subclass wishes to push a buffer downstream in its aggregate + * + * * If the subclass wishes to push a buffer downstream in its aggregate * implementation, it should do so through the * gst_aggregator_finish_buffer () method. This method will take care * of sending and ordering mandatory events such as stream start, caps * and segment. - * - * - * Same goes for EOS events, which should not be pushed directly by the + * + * * Same goes for EOS events, which should not be pushed directly by the * subclass, it should instead return GST_FLOW_EOS in its aggregate * implementation. - * - * - * Note that the aggregator logic regarding gap event handling is to turn + * + * * Note that the aggregator logic regarding gap event handling is to turn * these into gap buffers with matching PTS and duration. It will also * flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE * to ease their identification and subsequent processing. - * - * + * */ #ifdef HAVE_CONFIG_H @@ -217,13 +211,17 @@ struct _GstAggregatorPadPrivate gboolean first_buffer; - GQueue buffers; + GQueue data; /* buffers, events and queries */ + GstBuffer *clipped_buffer; guint num_buffers; GstClockTime head_position; GstClockTime tail_position; - GstClockTime head_time; + GstClockTime head_time; /* running time */ GstClockTime tail_time; - GstClockTime time_level; + GstClockTime time_level; /* how much head is ahead of tail */ + GstSegment head_segment; /* segment before the queue */ + + gboolean negotiated; gboolean eos; @@ -235,24 +233,32 @@ struct _GstAggregatorPadPrivate GMutex flush_lock; }; -static gboolean -gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) +/* Must be called with PAD_LOCK held */ +static void +gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad) { - GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad); - - PAD_LOCK (aggpad); aggpad->priv->pending_eos = FALSE; aggpad->priv->eos = FALSE; aggpad->priv->flow_return = GST_FLOW_OK; GST_OBJECT_LOCK (aggpad); gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED); - gst_segment_init (&aggpad->clip_segment, GST_FORMAT_UNDEFINED); + gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED); GST_OBJECT_UNLOCK (aggpad); aggpad->priv->head_position = GST_CLOCK_TIME_NONE; aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; aggpad->priv->head_time = GST_CLOCK_TIME_NONE; aggpad->priv->tail_time = GST_CLOCK_TIME_NONE; aggpad->priv->time_level = 0; + aggpad->priv->first_buffer = TRUE; +} + +static gboolean +gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) +{ + GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad); + + PAD_LOCK (aggpad); + gst_aggregator_pad_reset_unlocked (aggpad); PAD_UNLOCK (aggpad); if (klass->flush) @@ -270,7 +276,7 @@ static GstElementClass *aggregator_parent_class = NULL; struct _GstAggregatorPrivate { - gint padcount; + gint max_padserial; /* Our state is >= PAUSED */ gboolean running; /* protected by src_lock */ @@ -290,7 +296,7 @@ struct _GstAggregatorPrivate gboolean peer_latency_live; /* protected by src_lock */ GstClockTime peer_latency_min; /* protected by src_lock */ GstClockTime peer_latency_max; /* protected by src_lock */ - gboolean has_peer_latency; + gboolean has_peer_latency; /* protected by src_lock */ GstClockTime sub_latency_min; /* protected by src_lock */ GstClockTime sub_latency_max; /* protected by src_lock */ @@ -300,10 +306,16 @@ struct _GstAggregatorPrivate GMutex src_lock; GCond src_cond; - gboolean first_buffer; + gboolean first_buffer; /* protected by object lock */ GstAggregatorStartTimeSelection start_time_selection; GstClockTime start_time; + /* protected by the object lock */ + GstQuery *allocation_query; + GstAllocator *allocator; + GstBufferPool *pool; + GstAllocationParams allocation_params; + /* properties */ gint64 latency; /* protected by both src_lock and all pad locks */ }; @@ -344,6 +356,8 @@ static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, * * This method guarantees that @func will be called only once for each * sink pad. + * + * Returns: %FALSE if there are no sinkpads or if @func returned %FALSE */ gboolean gst_aggregator_iterate_sinkpads (GstAggregator * self, @@ -416,7 +430,8 @@ no_iter: static gboolean gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) { - return (g_queue_peek_tail (&pad->priv->buffers) == NULL); + return (g_queue_peek_tail (&pad->priv->data) == NULL && + pad->priv->clipped_buffer == NULL); } static gboolean @@ -424,6 +439,8 @@ gst_aggregator_check_pads_ready (GstAggregator * self) { GstAggregatorPad *pad; GList *l, *sinkpads; + gboolean have_buffer = TRUE; + gboolean have_event = FALSE; GST_LOG_OBJECT (self, "checking pads"); @@ -438,23 +455,34 @@ gst_aggregator_check_pads_ready (GstAggregator * self) PAD_LOCK (pad); - /* In live mode, having a single pad with buffers is enough to - * generate a start time from it. In non-live mode all pads need - * to have a buffer - */ - if (self->priv->peer_latency_live && - !gst_aggregator_pad_queue_is_empty (pad)) - self->priv->first_buffer = FALSE; + if (pad->priv->num_buffers == 0) { + if (!gst_aggregator_pad_queue_is_empty (pad)) + have_event = TRUE; + if (!pad->priv->eos) { + have_buffer = FALSE; - if (gst_aggregator_pad_queue_is_empty (pad) && !pad->priv->eos) { - PAD_UNLOCK (pad); - goto pad_not_ready; + /* If not live we need data on all pads, so leave the loop */ + if (!self->priv->peer_latency_live) { + PAD_UNLOCK (pad); + goto pad_not_ready; + } + } + } else if (self->priv->peer_latency_live) { + /* In live mode, having a single pad with buffers is enough to + * generate a start time from it. In non-live mode all pads need + * to have a buffer + */ + self->priv->first_buffer = FALSE; } - PAD_UNLOCK (pad); + PAD_UNLOCK (pad); } - self->priv->first_buffer = FALSE; + if (!have_buffer && !have_event) + goto pad_not_ready; + + if (have_buffer) + self->priv->first_buffer = FALSE; GST_OBJECT_UNLOCK (self); GST_LOG_OBJECT (self, "pads are ready"); @@ -468,9 +496,13 @@ no_sinkpads: } pad_not_ready: { - GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet"); + if (have_event) + GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet," + " but waking up for serialized event"); + else + GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet"); GST_OBJECT_UNLOCK (self); - return FALSE; + return have_event; } } @@ -649,6 +681,7 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) * and if a pad does not have a buffer in time we ignore * that pad. */ + GST_OBJECT_LOCK (self); if (!GST_CLOCK_TIME_IS_VALID (latency) || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) || !GST_CLOCK_TIME_IS_VALID (start) || @@ -659,6 +692,7 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) * then check if we're ready now. If we return FALSE, * we will be directly called again. */ + GST_OBJECT_UNLOCK (self); SRC_WAIT (self); } else { GstClockTime base_time, time; @@ -669,11 +703,8 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT, GST_TIME_ARGS (start)); - GST_OBJECT_LOCK (self); base_time = GST_ELEMENT_CAST (self)->base_time; - clock = GST_ELEMENT_CLOCK (self); - if (clock) - gst_object_ref (clock); + clock = gst_object_ref (GST_ELEMENT_CLOCK (self)); GST_OBJECT_UNLOCK (self); time = base_time + start; @@ -683,7 +714,7 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time), - GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time), + GST_TIME_ARGS (base_time), GST_TIME_ARGS (start), GST_TIME_ARGS (latency), GST_TIME_ARGS (gst_clock_get_time (clock))); @@ -701,9 +732,8 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) } GST_DEBUG_OBJECT (self, - "clock returned %d (jitter: %s%" GST_TIME_FORMAT ")", - status, (jitter < 0 ? "-" : " "), - GST_TIME_ARGS ((jitter < 0 ? -jitter : jitter))); + "clock returned %d (jitter: %" GST_STIME_FORMAT ")", + status, GST_STIME_ARGS (jitter)); /* we timed out */ if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) { @@ -720,35 +750,69 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) } static gboolean -check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) +gst_aggregator_do_events_and_queries (GstAggregator * self, + GstAggregatorPad * pad, gpointer user_data) { GstEvent *event = NULL; + GstQuery *query = NULL; GstAggregatorClass *klass = NULL; gboolean *processed_event = user_data; do { event = NULL; + query = NULL; PAD_LOCK (pad); - if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) { + if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) { pad->priv->pending_eos = FALSE; pad->priv->eos = TRUE; } - if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) { - event = g_queue_pop_tail (&pad->priv->buffers); - PAD_BROADCAST_EVENT (pad); + if (pad->priv->clipped_buffer == NULL && + !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { + if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))) + event = gst_event_ref (g_queue_peek_tail (&pad->priv->data)); + if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data))) + query = g_queue_peek_tail (&pad->priv->data); } PAD_UNLOCK (pad); - if (event) { + if (event || query) { + gboolean ret; + if (processed_event) *processed_event = TRUE; if (klass == NULL) klass = GST_AGGREGATOR_GET_CLASS (self); - GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); - klass->sink_event (self, pad, event); + if (event) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); + gst_event_ref (event); + ret = klass->sink_event (self, pad, event); + + PAD_LOCK (pad); + if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) + pad->priv->negotiated = ret; + if (g_queue_peek_tail (&pad->priv->data) == event) + gst_event_unref (g_queue_pop_tail (&pad->priv->data)); + gst_event_unref (event); + } else if (query) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query); + ret = klass->sink_query (self, pad, query); + + PAD_LOCK (pad); + if (g_queue_peek_tail (&pad->priv->data) == query) { + GstStructure *s; + + s = gst_query_writable_structure (query); + gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret, + NULL); + g_queue_pop_tail (&pad->priv->data); + } + } + + PAD_BROADCAST_EVENT (pad); + PAD_UNLOCK (pad); } - } while (event != NULL); + } while (event || query); return TRUE; } @@ -765,7 +829,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, else aggpad->priv->flow_return = flow_return; - item = g_queue_peek_head_link (&aggpad->priv->buffers); + item = g_queue_peek_head_link (&aggpad->priv->data); while (item) { GList *next = item->next; @@ -776,17 +840,245 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, GST_EVENT_TYPE (item->data) == GST_EVENT_EOS || GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT || !GST_EVENT_IS_STICKY (item->data)) { - gst_mini_object_unref (item->data); - g_queue_delete_link (&aggpad->priv->buffers, item); + if (!GST_IS_QUERY (item->data)) + gst_mini_object_unref (item->data); + g_queue_delete_link (&aggpad->priv->data, item); } item = next; } aggpad->priv->num_buffers = 0; + gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL); PAD_BROADCAST_EVENT (aggpad); PAD_UNLOCK (aggpad); } +static GstFlowReturn +gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps, + GstCaps ** ret) +{ + *ret = gst_caps_ref (caps); + + return GST_FLOW_OK; +} + +static GstCaps * +gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps) +{ + caps = gst_caps_fixate (caps); + + return caps; +} + +static gboolean +gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps) +{ + return TRUE; +} + + +/* takes ownership of the pool, allocator and query */ +static gboolean +gst_aggregator_set_allocation (GstAggregator * self, + GstBufferPool * pool, GstAllocator * allocator, + GstAllocationParams * params, GstQuery * query) +{ + GstAllocator *oldalloc; + GstBufferPool *oldpool; + GstQuery *oldquery; + + GST_DEBUG ("storing allocation query"); + + GST_OBJECT_LOCK (self); + oldpool = self->priv->pool; + self->priv->pool = pool; + + oldalloc = self->priv->allocator; + self->priv->allocator = allocator; + + oldquery = self->priv->allocation_query; + self->priv->allocation_query = query; + + if (params) + self->priv->allocation_params = *params; + else + gst_allocation_params_init (&self->priv->allocation_params); + GST_OBJECT_UNLOCK (self); + + if (oldpool) { + GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool); + gst_buffer_pool_set_active (oldpool, FALSE); + gst_object_unref (oldpool); + } + if (oldalloc) { + gst_object_unref (oldalloc); + } + if (oldquery) { + gst_query_unref (oldquery); + } + return TRUE; +} + + +static gboolean +gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query) +{ + GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self); + + if (aggclass->decide_allocation) + if (!aggclass->decide_allocation (self, query)) + return FALSE; + + return TRUE; +} + +static gboolean +gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps) +{ + GstQuery *query; + gboolean result = TRUE; + GstBufferPool *pool = NULL; + GstAllocator *allocator; + GstAllocationParams params; + + /* find a pool for the negotiated caps now */ + GST_DEBUG_OBJECT (self, "doing allocation query"); + query = gst_query_new_allocation (caps, TRUE); + if (!gst_pad_peer_query (self->srcpad, query)) { + /* not a problem, just debug a little */ + GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed"); + } + + GST_DEBUG_OBJECT (self, "calling decide_allocation"); + result = gst_aggregator_decide_allocation (self, query); + + GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result, + query); + + if (!result) + goto no_decide_allocation; + + /* we got configuration from our peer or the decide_allocation method, + * parse them */ + if (gst_query_get_n_allocation_params (query) > 0) { + gst_query_parse_nth_allocation_param (query, 0, &allocator, ¶ms); + } else { + allocator = NULL; + gst_allocation_params_init (¶ms); + } + + if (gst_query_get_n_allocation_pools (query) > 0) + gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL); + + /* now store */ + result = + gst_aggregator_set_allocation (self, pool, allocator, ¶ms, query); + + return result; + + /* Errors */ +no_decide_allocation: + { + GST_WARNING_OBJECT (self, "Failed to decide allocation"); + gst_query_unref (query); + + return result; + } + +} + +/* WITH SRC_LOCK held */ +static GstFlowReturn +gst_aggregator_update_src_caps (GstAggregator * self) +{ + GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self); + GstCaps *downstream_caps, *template_caps, *caps = NULL; + GstFlowReturn ret = GST_FLOW_OK; + + template_caps = gst_pad_get_pad_template_caps (self->srcpad); + downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps); + + if (gst_caps_is_empty (downstream_caps)) { + GST_INFO_OBJECT (self, "Downstream caps (%" + GST_PTR_FORMAT ") not compatible with pad template caps (%" + GST_PTR_FORMAT ")", downstream_caps, template_caps); + ret = GST_FLOW_NOT_NEGOTIATED; + goto done; + } + + g_assert (agg_klass->update_src_caps); + GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT, + downstream_caps); + ret = agg_klass->update_src_caps (self, downstream_caps, &caps); + if (ret < GST_FLOW_OK) { + GST_WARNING_OBJECT (self, "Subclass failed to update provided caps"); + goto done; + } + if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) { + ret = GST_FLOW_NOT_NEGOTIATED; + goto done; + } + GST_DEBUG_OBJECT (self, " to %" GST_PTR_FORMAT, caps); + +#ifdef GST_ENABLE_EXTRA_CHECKS + if (!gst_caps_is_subset (caps, template_caps)) { + GstCaps *intersection; + + GST_ERROR_OBJECT (self, + "update_src_caps returned caps %" GST_PTR_FORMAT + " which are not a real subset of the template caps %" + GST_PTR_FORMAT, caps, template_caps); + g_warning ("%s: update_src_caps returned caps which are not a real " + "subset of the filter caps", GST_ELEMENT_NAME (self)); + + intersection = + gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST); + gst_caps_unref (caps); + caps = intersection; + } +#endif + + if (gst_caps_is_any (caps)) { + goto done; + } + + if (!gst_caps_is_fixed (caps)) { + g_assert (agg_klass->fixate_src_caps); + + GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps); + if (!(caps = agg_klass->fixate_src_caps (self, caps))) { + GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps"); + ret = GST_FLOW_NOT_NEGOTIATED; + goto done; + } + GST_DEBUG_OBJECT (self, " to %" GST_PTR_FORMAT, caps); + } + + if (agg_klass->negotiated_src_caps) { + if (!agg_klass->negotiated_src_caps (self, caps)) { + GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps"); + ret = GST_FLOW_NOT_NEGOTIATED; + goto done; + } + } + + gst_aggregator_set_src_caps (self, caps); + + if (!gst_aggregator_do_allocation (self, caps)) { + GST_WARNING_OBJECT (self, "Allocation negotiation failed"); + ret = GST_FLOW_NOT_NEGOTIATED; + } + +done: + gst_caps_unref (downstream_caps); + gst_caps_unref (template_caps); + + if (caps) + gst_caps_unref (caps); + + return ret; +} + static void gst_aggregator_aggregate_func (GstAggregator * self) { @@ -801,20 +1093,33 @@ gst_aggregator_aggregate_func (GstAggregator * self) GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && priv->running) { - GstFlowReturn flow_return; + GstFlowReturn flow_return = GST_FLOW_OK; gboolean processed_event = FALSE; - gst_aggregator_iterate_sinkpads (self, check_events, NULL); + gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries, + NULL); if (!gst_aggregator_wait_and_check (self, &timeout)) continue; - gst_aggregator_iterate_sinkpads (self, check_events, &processed_event); + gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries, + &processed_event); if (processed_event) continue; - GST_TRACE_OBJECT (self, "Actually aggregating!"); - flow_return = klass->aggregate (self, timeout); + if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) { + flow_return = gst_aggregator_update_src_caps (self); + if (flow_return != GST_FLOW_OK) + gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self)); + } + + if (timeout || flow_return >= GST_FLOW_OK) { + GST_TRACE_OBJECT (self, "Actually aggregating!"); + flow_return = klass->aggregate (self, timeout); + } + + if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA) + continue; GST_OBJECT_LOCK (self); if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) { @@ -861,12 +1166,13 @@ gst_aggregator_start (GstAggregator * self) GstAggregatorClass *klass; gboolean result; - self->priv->running = TRUE; self->priv->send_stream_start = TRUE; self->priv->send_segment = TRUE; self->priv->send_eos = TRUE; self->priv->srccaps = NULL; + gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL); + klass = GST_AGGREGATOR_GET_CLASS (self); if (klass->start) @@ -1012,12 +1318,15 @@ update_time_level (GstAggregatorPad * aggpad, gboolean head) { if (head) { if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) && - aggpad->clip_segment.format == GST_FORMAT_TIME) + aggpad->priv->head_segment.format == GST_FORMAT_TIME) aggpad->priv->head_time = - gst_segment_to_running_time (&aggpad->clip_segment, + gst_segment_to_running_time (&aggpad->priv->head_segment, GST_FORMAT_TIME, aggpad->priv->head_position); else aggpad->priv->head_time = GST_CLOCK_TIME_NONE; + + if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time)) + aggpad->priv->tail_time = aggpad->priv->head_time; } else { if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) && aggpad->segment.format == GST_FORMAT_TIME) @@ -1051,6 +1360,8 @@ gst_aggregator_default_sink_event (GstAggregator * self, GstPad *pad = GST_PAD (aggpad); GstAggregatorPrivate *priv = self->priv; + GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event); + switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: { @@ -1061,8 +1372,6 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_FLUSH_STOP: { - GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP"); - gst_aggregator_pad_flush (aggpad, self); GST_OBJECT_LOCK (self); if (priv->flush_seeking) { @@ -1089,22 +1398,18 @@ gst_aggregator_default_sink_event (GstAggregator * self, GST_OBJECT_UNLOCK (self); } - aggpad->priv->first_buffer = TRUE; - /* We never forward the event */ goto eat; } case GST_EVENT_EOS: { - GST_DEBUG_OBJECT (aggpad, "EOS"); - /* We still have a buffer, and we don't want the subclass to have to * check for it. Mark pending_eos, eos will be set when steal_buffer is * called */ SRC_LOCK (self); PAD_LOCK (aggpad); - if (gst_aggregator_pad_queue_is_empty (aggpad)) { + if (aggpad->priv->num_buffers == 0) { aggpad->priv->eos = TRUE; } else { aggpad->priv->pending_eos = TRUE; @@ -1120,6 +1425,10 @@ gst_aggregator_default_sink_event (GstAggregator * self, PAD_LOCK (aggpad); GST_OBJECT_LOCK (aggpad); gst_event_copy_segment (event, &aggpad->segment); + /* We've got a new segment, tail_position is now meaningless + * and may interfere with the time_level calculation + */ + aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; update_time_level (aggpad, FALSE); GST_OBJECT_UNLOCK (aggpad); PAD_UNLOCK (aggpad); @@ -1167,6 +1476,10 @@ gst_aggregator_default_sink_event (GstAggregator * self, GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP); GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE); + /* Remove GAP event so we can replace it with the buffer */ + if (g_queue_peek_tail (&aggpad->priv->data) == event) + gst_event_unref (g_queue_pop_tail (&aggpad->priv->data)); + if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) != GST_FLOW_OK) { GST_WARNING_OBJECT (self, "Failed to chain gap buffer"); @@ -1212,6 +1525,12 @@ gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad, { gst_aggregator_pad_flush (pad, self); + PAD_LOCK (pad); + pad->priv->flow_return = GST_FLOW_FLUSHING; + pad->priv->negotiated = FALSE; + PAD_BROADCAST_EVENT (pad); + PAD_UNLOCK (pad); + return TRUE; } @@ -1240,6 +1559,8 @@ gst_aggregator_stop (GstAggregator * agg) gst_tag_list_unref (agg->priv->tags); agg->priv->tags = NULL; + gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL); + return result; } @@ -1308,47 +1629,73 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad) SRC_UNLOCK (self); } -static GstPad * -gst_aggregator_request_new_pad (GstElement * element, +static GstAggregatorPad * +gst_aggregator_default_create_new_pad (GstAggregator * self, GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps) { - GstAggregator *self; GstAggregatorPad *agg_pad; + GstAggregatorPrivate *priv = self->priv; + gint serial = 0; + gchar *name = NULL; - GstElementClass *klass = GST_ELEMENT_GET_CLASS (element); - GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv; + if (templ->direction != GST_PAD_SINK) + goto not_sink; - self = GST_AGGREGATOR (element); + if (templ->presence != GST_PAD_REQUEST) + goto not_request; - if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) { - gint serial = 0; - gchar *name = NULL; + GST_OBJECT_LOCK (self); + if (req_name == NULL || strlen (req_name) < 6 + || !g_str_has_prefix (req_name, "sink_")) { + /* no name given when requesting the pad, use next available int */ + serial = ++priv->max_padserial; + } else { + /* parse serial number from requested padname */ + serial = g_ascii_strtoull (&req_name[5], NULL, 10); + if (serial > priv->max_padserial) + priv->max_padserial = serial; + } - GST_OBJECT_LOCK (element); - if (req_name == NULL || strlen (req_name) < 6 - || !g_str_has_prefix (req_name, "sink_")) { - /* no name given when requesting the pad, use next available int */ - priv->padcount++; - } else { - /* parse serial number from requested padname */ - serial = g_ascii_strtoull (&req_name[5], NULL, 10); - if (serial >= priv->padcount) - priv->padcount = serial; - } + name = g_strdup_printf ("sink_%u", serial); + agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type, + "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); + g_free (name); - name = g_strdup_printf ("sink_%u", priv->padcount); - agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type, - "name", name, "direction", GST_PAD_SINK, "template", templ, NULL); - g_free (name); + GST_OBJECT_UNLOCK (self); - GST_OBJECT_UNLOCK (element); + return agg_pad; - } else { + /* errors */ +not_sink: + { + GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad"); + return NULL; + } +not_request: + { + GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad"); + return NULL; + } +} + +static GstPad * +gst_aggregator_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps) +{ + GstAggregator *self; + GstAggregatorPad *agg_pad; + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element); + GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv; + + self = GST_AGGREGATOR (element); + + agg_pad = klass->create_new_pad (self, templ, req_name, caps); + if (!agg_pad) { + GST_ERROR_OBJECT (element, "Couldn't create new pad"); return NULL; } GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad)); - self->priv->has_peer_latency = FALSE; if (priv->running) gst_pad_set_active (GST_PAD (agg_pad), TRUE); @@ -1406,18 +1753,6 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query) else max = GST_CLOCK_TIME_NONE; - if (live && min > max) { - GST_ELEMENT_WARNING (self, CORE, NEGOTIATION, - ("%s", "Latency too big"), - ("The requested latency value is too big for the current pipeline. " - "Limiting to %" G_GINT64_FORMAT, max)); - min = max; - /* FIXME: This could in theory become negative, but in - * that case all is lost anyway */ - self->priv->latency -= min - max; - /* FIXME: shouldn't we g_object_notify() the change here? */ - } - SRC_BROADCAST (self); GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT @@ -1570,12 +1905,11 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data) } if (ret == FALSE) { - if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) - GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event); - if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) { GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME); + GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event); + if (gst_pad_query (peer, seeking)) { gboolean seekable; @@ -1796,6 +2130,45 @@ gst_aggregator_default_sink_query (GstAggregator * self, { GstPad *pad = GST_PAD (aggpad); + if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) { + GstQuery *decide_query = NULL; + GstAggregatorClass *agg_class; + gboolean ret; + + GST_OBJECT_LOCK (self); + PAD_LOCK (aggpad); + if (G_UNLIKELY (!aggpad->priv->negotiated)) { + GST_DEBUG_OBJECT (self, + "not negotiated yet, can't answer ALLOCATION query"); + PAD_UNLOCK (aggpad); + GST_OBJECT_UNLOCK (self); + + return FALSE; + } + + if ((decide_query = self->priv->allocation_query)) + gst_query_ref (decide_query); + PAD_UNLOCK (aggpad); + GST_OBJECT_UNLOCK (self); + + GST_DEBUG_OBJECT (self, + "calling propose allocation with query %" GST_PTR_FORMAT, decide_query); + + agg_class = GST_AGGREGATOR_GET_CLASS (self); + + /* pass the query to the propose_allocation vmethod if any */ + if (agg_class->propose_allocation) + ret = agg_class->propose_allocation (self, aggpad, decide_query, query); + else + ret = FALSE; + + if (decide_query) + gst_query_unref (decide_query); + + GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query); + return ret; + } + return gst_pad_query_default (pad, GST_OBJECT (self), query); } @@ -1867,7 +2240,7 @@ gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency) * Gets the latency value. See gst_aggregator_set_latency for * more details. * - * Returns: The time in nanoseconds to wait for data to arrive on a sink pad + * Returns: The time in nanoseconds to wait for data to arrive on a sink pad * before a pad is deemed unresponsive. A value of -1 means an * unlimited time. */ @@ -1950,6 +2323,11 @@ gst_aggregator_class_init (GstAggregatorClass * klass) klass->src_event = gst_aggregator_default_src_event; klass->src_query = gst_aggregator_default_src_query; + klass->create_new_pad = gst_aggregator_default_create_new_pad; + klass->update_src_caps = gst_aggregator_default_update_src_caps; + klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps; + klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps; + gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad); gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event); @@ -1984,6 +2362,7 @@ gst_aggregator_class_init (GstAggregatorClass * klass) DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad); + GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries); } static void @@ -2004,7 +2383,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src"); g_return_if_fail (pad_template != NULL); - priv->padcount = -1; + priv->max_padserial = -1; priv->tags_changed = FALSE; self->priv->peer_latency_live = FALSE; @@ -2065,7 +2444,7 @@ static gboolean gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad) { /* Empty queue always has space */ - if (g_queue_get_length (&aggpad->priv->buffers) == 0) + if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL) return TRUE; /* We also want at least two buffers, one is being processed and one is ready @@ -2115,8 +2494,6 @@ static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) { - GstBuffer *actual_buf = buffer; - GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self); GstFlowReturn flow_return; GstClockTime buf_pts; @@ -2132,47 +2509,41 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, if (aggpad->priv->pending_eos == TRUE) goto eos; - flow_return = aggpad->priv->flow_return; - if (flow_return != GST_FLOW_OK) - goto flushing; - PAD_UNLOCK (aggpad); - if (aggclass->clip && head) { - aggclass->clip (self, aggpad, buffer, &actual_buf); - } - - if (actual_buf == NULL) { - GST_LOG_OBJECT (actual_buf, "Buffer dropped by clip function"); - goto done; - } - - buf_pts = GST_BUFFER_PTS (actual_buf); - - aggpad->priv->first_buffer = FALSE; + buf_pts = GST_BUFFER_PTS (buffer); for (;;) { SRC_LOCK (self); + GST_OBJECT_LOCK (self); PAD_LOCK (aggpad); + + if (aggpad->priv->first_buffer) { + self->priv->has_peer_latency = FALSE; + aggpad->priv->first_buffer = FALSE; + } + if (gst_aggregator_pad_has_space (self, aggpad) && aggpad->priv->flow_return == GST_FLOW_OK) { if (head) - g_queue_push_head (&aggpad->priv->buffers, actual_buf); + g_queue_push_head (&aggpad->priv->data, buffer); else - g_queue_push_tail (&aggpad->priv->buffers, actual_buf); - apply_buffer (aggpad, actual_buf, head); + g_queue_push_tail (&aggpad->priv->data, buffer); + apply_buffer (aggpad, buffer, head); aggpad->priv->num_buffers++; - actual_buf = buffer = NULL; + buffer = NULL; SRC_BROADCAST (self); break; } flow_return = aggpad->priv->flow_return; if (flow_return != GST_FLOW_OK) { + GST_OBJECT_UNLOCK (self); SRC_UNLOCK (self); goto flushing; } GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); + GST_OBJECT_UNLOCK (self); SRC_UNLOCK (self); PAD_WAIT_EVENT (aggpad); @@ -2188,13 +2559,14 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, start_time = 0; break; case GST_AGGREGATOR_START_TIME_SELECTION_FIRST: - if (aggpad->segment.format == GST_FORMAT_TIME) { + GST_OBJECT_LOCK (aggpad); + if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) { start_time = buf_pts; if (start_time != -1) { - start_time = MAX (start_time, aggpad->segment.start); + start_time = MAX (start_time, aggpad->priv->head_segment.start); start_time = - gst_segment_to_running_time (&aggpad->segment, GST_FORMAT_TIME, - start_time); + gst_segment_to_running_time (&aggpad->priv->head_segment, + GST_FORMAT_TIME, start_time); } } else { start_time = 0; @@ -2203,6 +2575,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, "as the segment is a %s segment instead of a time segment", gst_format_get_name (aggpad->segment.format)); } + GST_OBJECT_UNLOCK (aggpad); break; case GST_AGGREGATOR_START_TIME_SELECTION_SET: start_time = self->priv->start_time; @@ -2223,10 +2596,9 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, } PAD_UNLOCK (aggpad); + GST_OBJECT_UNLOCK (self); SRC_UNLOCK (self); -done: - PAD_FLUSH_UNLOCK (aggpad); GST_DEBUG_OBJECT (aggpad, "Done chaining"); @@ -2265,38 +2637,61 @@ static gboolean gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query) { + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); if (GST_QUERY_IS_SERIALIZED (query)) { + GstStructure *s; + gboolean ret = FALSE; + + SRC_LOCK (self); PAD_LOCK (aggpad); + if (aggpad->priv->flow_return != GST_FLOW_OK) { + SRC_UNLOCK (self); + goto flushing; + } + + g_queue_push_head (&aggpad->priv->data, query); + SRC_BROADCAST (self); + SRC_UNLOCK (self); + while (!gst_aggregator_pad_queue_is_empty (aggpad) && aggpad->priv->flow_return == GST_FLOW_OK) { GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); } + s = gst_query_writable_structure (query); + if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret)) + gst_structure_remove_field (s, "gst-aggregator-retval"); + else + g_queue_remove (&aggpad->priv->data, query); + if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; PAD_UNLOCK (aggpad); + + return ret; } - return klass->sink_query (GST_AGGREGATOR (parent), - GST_AGGREGATOR_PAD (pad), query); + return klass->sink_query (self, aggpad, query); flushing: GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", gst_flow_get_name (aggpad->priv->flow_return)); PAD_UNLOCK (aggpad); + return FALSE; } -static gboolean +static GstFlowReturn gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, GstEvent * event) { + GstFlowReturn ret = GST_FLOW_OK; GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); @@ -2307,22 +2702,23 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, PAD_LOCK (aggpad); if (aggpad->priv->flow_return != GST_FLOW_OK - && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) + && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { + ret = aggpad->priv->flow_return; goto flushing; + } if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { GST_OBJECT_LOCK (aggpad); - gst_event_copy_segment (event, &aggpad->clip_segment); - aggpad->priv->head_position = aggpad->clip_segment.position; + gst_event_copy_segment (event, &aggpad->priv->head_segment); + aggpad->priv->head_position = aggpad->priv->head_segment.position; update_time_level (aggpad, TRUE); GST_OBJECT_UNLOCK (aggpad); } - if (!gst_aggregator_pad_queue_is_empty (aggpad) && - GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { + if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event); - g_queue_push_head (&aggpad->priv->buffers, event); + g_queue_push_head (&aggpad->priv->data, event); event = NULL; SRC_BROADCAST (self); } @@ -2330,10 +2726,15 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, SRC_UNLOCK (self); } - if (event) - return klass->sink_event (self, aggpad, event); - else - return TRUE; + if (event) { + if (!klass->sink_event (self, aggpad, event)) { + /* Copied from GstPad to convert boolean to a GstFlowReturn in + * the event handling func */ + ret = GST_FLOW_ERROR; + } + } + + return ret; flushing: GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event", @@ -2343,7 +2744,8 @@ flushing: if (GST_EVENT_IS_STICKY (event)) gst_pad_store_sticky_event (pad, event); gst_event_unref (event); - return FALSE; + + return ret; } static gboolean @@ -2380,8 +2782,8 @@ gst_aggregator_pad_constructed (GObject * object) gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain)); - gst_pad_set_event_function (pad, - GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func)); + gst_pad_set_event_full_function_full (pad, + GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL); gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func)); gst_pad_set_activatemode_function (pad, @@ -2429,13 +2831,72 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD, GstAggregatorPadPrivate); - g_queue_init (&pad->priv->buffers); + g_queue_init (&pad->priv->data); g_cond_init (&pad->priv->event_cond); g_mutex_init (&pad->priv->flush_lock); g_mutex_init (&pad->priv->lock); - pad->priv->first_buffer = TRUE; + gst_aggregator_pad_reset_unlocked (pad); + pad->priv->negotiated = FALSE; +} + +/* Must be called with the PAD_LOCK held */ +static void +gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad) +{ + pad->priv->num_buffers--; + GST_TRACE_OBJECT (pad, "Consuming buffer"); + if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) { + pad->priv->pending_eos = FALSE; + pad->priv->eos = TRUE; + } + PAD_BROADCAST_EVENT (pad); +} + +/* Must be called with the PAD_LOCK held */ +static void +gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) +{ + GstAggregator *self = NULL; + GstAggregatorClass *aggclass = NULL; + GstBuffer *buffer = NULL; + + while (pad->priv->clipped_buffer == NULL && + GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { + buffer = g_queue_pop_tail (&pad->priv->data); + + apply_buffer (pad, buffer, FALSE); + + /* We only take the parent here so that it's not taken if the buffer is + * already clipped or if the queue is empty. + */ + if (self == NULL) { + self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad))); + if (self == NULL) { + gst_buffer_unref (buffer); + return; + } + + aggclass = GST_AGGREGATOR_GET_CLASS (self); + } + + if (aggclass->clip) { + GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer); + + buffer = aggclass->clip (self, pad, buffer); + + if (buffer == NULL) { + gst_aggregator_pad_buffer_consumed (pad); + GST_TRACE_OBJECT (pad, "Clipping consumed the buffer"); + } + } + + pad->priv->clipped_buffer = buffer; + } + + if (self) + gst_object_unref (self); } /** @@ -2450,23 +2911,20 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) GstBuffer * gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) { - GstBuffer *buffer = NULL; + GstBuffer *buffer; PAD_LOCK (pad); - if (GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) - buffer = g_queue_pop_tail (&pad->priv->buffers); + + gst_aggregator_pad_clip_buffer_unlocked (pad); + + buffer = pad->priv->clipped_buffer; if (buffer) { - apply_buffer (pad, buffer, FALSE); - pad->priv->num_buffers--; - GST_TRACE_OBJECT (pad, "Consuming buffer"); - if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) { - pad->priv->pending_eos = FALSE; - pad->priv->eos = TRUE; - } - PAD_BROADCAST_EVENT (pad); + pad->priv->clipped_buffer = NULL; + gst_aggregator_pad_buffer_consumed (pad); GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); } + PAD_UNLOCK (pad); return buffer; @@ -2505,17 +2963,17 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) GstBuffer * gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) { - GstBuffer *buffer = NULL; + GstBuffer *buffer; PAD_LOCK (pad); - buffer = g_queue_peek_tail (&pad->priv->buffers); - /* The tail should always be a buffer, because if it is an event, - * it will be consumed immeditaly in gst_aggregator_steal_buffer */ - if (GST_IS_BUFFER (buffer)) - gst_buffer_ref (buffer); - else + gst_aggregator_pad_clip_buffer_unlocked (pad); + + if (pad->priv->clipped_buffer) { + buffer = gst_buffer_ref (pad->priv->clipped_buffer); + } else { buffer = NULL; + } PAD_UNLOCK (pad); return buffer; @@ -2607,3 +3065,53 @@ gst_aggregator_set_latency (GstAggregator * self, gst_message_new_latency (GST_OBJECT_CAST (self))); } } + +/** + * gst_aggregator_get_buffer_pool: + * @self: a #GstAggregator + * + * Returns: (transfer full): the instance of the #GstBufferPool used + * by @trans; free it after use it + */ +GstBufferPool * +gst_aggregator_get_buffer_pool (GstAggregator * self) +{ + GstBufferPool *pool; + + g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL); + + GST_OBJECT_LOCK (self); + pool = self->priv->pool; + if (pool) + gst_object_ref (pool); + GST_OBJECT_UNLOCK (self); + + return pool; +} + +/** + * gst_aggregator_get_allocator: + * @self: a #GstAggregator + * @allocator: (out) (allow-none) (transfer full): the #GstAllocator + * used + * @params: (out) (allow-none) (transfer full): the + * #GstAllocationParams of @allocator + * + * Lets #GstAggregator sub-classes get the memory @allocator + * acquired by the base class and its @params. + * + * Unref the @allocator after use it. + */ +void +gst_aggregator_get_allocator (GstAggregator * self, + GstAllocator ** allocator, GstAllocationParams * params) +{ + g_return_if_fail (GST_IS_AGGREGATOR (self)); + + if (allocator) + *allocator = self->priv->allocator ? + gst_object_ref (self->priv->allocator) : NULL; + + if (params) + *params = self->priv->allocation_params; +}