From 2ce3234aa010c4b3f654f1967206d5aa87433f3a Mon Sep 17 00:00:00 2001 From: =?utf8?q?Olivier=20Cr=C3=AAte?= Date: Wed, 6 Jul 2016 16:39:17 -0400 Subject: [PATCH] aggregator: Delay clipping to output thread This is required because the synchronized events like caps or segments may only be processed on the output thread. https://bugzilla.gnome.org/show_bug.cgi?id=781673 --- libs/gst/base/gstaggregator.c | 132 ++++++++++++++++++++++++++++-------------- libs/gst/base/gstaggregator.h | 2 - 2 files changed, 90 insertions(+), 44 deletions(-) diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index d10e6e8..1a95fc9 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -212,12 +212,14 @@ struct _GstAggregatorPadPrivate gboolean first_buffer; GQueue buffers; + GstBuffer *clipped_buffer; guint num_buffers; GstClockTime head_position; GstClockTime tail_position; GstClockTime head_time; GstClockTime tail_time; GstClockTime time_level; + GstSegment head_segment; /* segment before the queue */ gboolean eos; @@ -238,7 +240,7 @@ gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad) aggpad->priv->flow_return = GST_FLOW_OK; GST_OBJECT_LOCK (aggpad); gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED); - gst_segment_init (&aggpad->clip_segment, GST_FORMAT_UNDEFINED); + gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED); GST_OBJECT_UNLOCK (aggpad); aggpad->priv->head_position = GST_CLOCK_TIME_NONE; aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; @@ -418,7 +420,8 @@ no_iter: static gboolean gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) { - return (g_queue_peek_tail (&pad->priv->buffers) == NULL); + return (g_queue_peek_tail (&pad->priv->buffers) == NULL && + pad->priv->clipped_buffer == NULL); } static gboolean @@ -751,7 +754,8 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) pad->priv->pending_eos = FALSE; pad->priv->eos = TRUE; } - if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) { + if (pad->priv->clipped_buffer == NULL && + GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) { event = g_queue_pop_tail (&pad->priv->buffers); PAD_BROADCAST_EVENT (pad); } @@ -799,6 +803,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, item = next; } aggpad->priv->num_buffers = 0; + gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL); PAD_BROADCAST_EVENT (aggpad); PAD_UNLOCK (aggpad); @@ -1028,9 +1033,9 @@ update_time_level (GstAggregatorPad * aggpad, gboolean head) { if (head) { if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) && - aggpad->clip_segment.format == GST_FORMAT_TIME) + aggpad->priv->head_segment.format == GST_FORMAT_TIME) aggpad->priv->head_time = - gst_segment_to_running_time (&aggpad->clip_segment, + gst_segment_to_running_time (&aggpad->priv->head_segment, GST_FORMAT_TIME, aggpad->priv->head_position); else aggpad->priv->head_time = GST_CLOCK_TIME_NONE; @@ -2091,7 +2096,7 @@ static gboolean gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad) { /* Empty queue always has space */ - if (aggpad->priv->num_buffers == 0) + if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL) return TRUE; /* We also want at least two buffers, one is being processed and one is ready @@ -2141,7 +2146,6 @@ static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head) { - GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self); GstFlowReturn flow_return; GstClockTime buf_pts; @@ -2159,15 +2163,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, PAD_UNLOCK (aggpad); - if (aggclass->clip && head) { - buffer = aggclass->clip (self, aggpad, buffer); - } - - if (buffer == NULL) { - GST_LOG_OBJECT (aggpad, "Buffer dropped by clip function"); - goto done; - } - buf_pts = GST_BUFFER_PTS (buffer); aggpad->priv->first_buffer = FALSE; @@ -2213,13 +2208,13 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, break; case GST_AGGREGATOR_START_TIME_SELECTION_FIRST: GST_OBJECT_LOCK (aggpad); - if (aggpad->segment.format == GST_FORMAT_TIME) { + if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) { start_time = buf_pts; if (start_time != -1) { - start_time = MAX (start_time, aggpad->segment.start); + start_time = MAX (start_time, aggpad->priv->head_segment.start); start_time = - gst_segment_to_running_time (&aggpad->segment, GST_FORMAT_TIME, - start_time); + gst_segment_to_running_time (&aggpad->priv->head_segment, + GST_FORMAT_TIME, start_time); } } else { start_time = 0; @@ -2252,8 +2247,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, GST_OBJECT_UNLOCK (self); SRC_UNLOCK (self); -done: - PAD_FLUSH_UNLOCK (aggpad); GST_DEBUG_OBJECT (aggpad, "Done chaining"); @@ -2342,8 +2335,8 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) { GST_OBJECT_LOCK (aggpad); - gst_event_copy_segment (event, &aggpad->clip_segment); - aggpad->priv->head_position = aggpad->clip_segment.position; + gst_event_copy_segment (event, &aggpad->priv->head_segment); + aggpad->priv->head_position = aggpad->priv->head_segment.position; update_time_level (aggpad, TRUE); GST_OBJECT_UNLOCK (aggpad); } @@ -2476,6 +2469,64 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) gst_aggregator_pad_reset_unlocked (pad); } +/* Must be called with the PAD_LOCK held */ +static void +gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad) +{ + pad->priv->num_buffers--; + GST_TRACE_OBJECT (pad, "Consuming buffer"); + if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) { + pad->priv->pending_eos = FALSE; + pad->priv->eos = TRUE; + } + PAD_BROADCAST_EVENT (pad); +} + +/* Must be called with the PAD_LOCK held */ +static void +gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) +{ + GstAggregator *self = NULL; + GstAggregatorClass *aggclass; + GstBuffer *buffer = NULL; + + while (pad->priv->clipped_buffer == NULL && + GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) { + buffer = g_queue_pop_tail (&pad->priv->buffers); + + apply_buffer (pad, buffer, FALSE); + + /* We only take the parent here so that it's not taken if the buffer is + * already clipped or if the queue is empty. + */ + if (self == NULL) { + self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad))); + if (self == NULL) { + gst_buffer_unref (buffer); + return; + } + + aggclass = GST_AGGREGATOR_GET_CLASS (self); + } + + if (aggclass->clip) { + GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer); + + buffer = aggclass->clip (self, pad, buffer); + + if (buffer == NULL) { + gst_aggregator_pad_buffer_consumed (pad); + GST_TRACE_OBJECT (pad, "Clipping consumed the buffer"); + } + } + + pad->priv->clipped_buffer = buffer; + } + + if (self) + gst_object_unref (self); +} + /** * gst_aggregator_pad_steal_buffer: * @pad: the pad to get buffer from @@ -2488,23 +2539,20 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) GstBuffer * gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) { - GstBuffer *buffer = NULL; + GstBuffer *buffer; PAD_LOCK (pad); - if (GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) - buffer = g_queue_pop_tail (&pad->priv->buffers); + + gst_aggregator_pad_clip_buffer_unlocked (pad); + + buffer = pad->priv->clipped_buffer; + pad->priv->clipped_buffer = NULL; if (buffer) { - apply_buffer (pad, buffer, FALSE); - pad->priv->num_buffers--; - GST_TRACE_OBJECT (pad, "Consuming buffer"); - if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) { - pad->priv->pending_eos = FALSE; - pad->priv->eos = TRUE; - } - PAD_BROADCAST_EVENT (pad); + gst_aggregator_pad_buffer_consumed (pad); GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); } + PAD_UNLOCK (pad); return buffer; @@ -2543,17 +2591,17 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad) GstBuffer * gst_aggregator_pad_get_buffer (GstAggregatorPad * pad) { - GstBuffer *buffer = NULL; + GstBuffer *buffer; PAD_LOCK (pad); - buffer = g_queue_peek_tail (&pad->priv->buffers); - /* The tail should always be a buffer, because if it is an event, - * it will be consumed immeditaly in gst_aggregator_steal_buffer */ - if (GST_IS_BUFFER (buffer)) - gst_buffer_ref (buffer); - else + gst_aggregator_pad_clip_buffer_unlocked (pad); + + if (pad->priv->clipped_buffer) { + buffer = gst_buffer_ref (pad->priv->clipped_buffer); + } else { buffer = NULL; + } PAD_UNLOCK (pad); return buffer; diff --git a/libs/gst/base/gstaggregator.h b/libs/gst/base/gstaggregator.h index e6238b7..38d2605 100644 --- a/libs/gst/base/gstaggregator.h +++ b/libs/gst/base/gstaggregator.h @@ -71,8 +71,6 @@ struct _GstAggregatorPad /* Protected by the OBJECT_LOCK */ GstSegment segment; - /* Segment to use in the clip function, before the queue */ - GstSegment clip_segment; /* < Private > */ GstAggregatorPadPrivate * priv; -- 2.7.4