aggregator: Delay clipping to output thread
authorOlivier Crête <olivier.crete@collabora.com>
Wed, 6 Jul 2016 20:39:17 +0000 (16:39 -0400)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:27 +0000 (15:10 +0000)
This is required because the synchronized events like caps or segments
may only be processed on the output thread.

https://bugzilla.gnome.org/show_bug.cgi?id=781673

libs/gst/base/gstaggregator.c
libs/gst/base/gstaggregator.h

index d10e6e8..1a95fc9 100644 (file)
@@ -212,12 +212,14 @@ struct _GstAggregatorPadPrivate
   gboolean first_buffer;
 
   GQueue buffers;
+  GstBuffer *clipped_buffer;
   guint num_buffers;
   GstClockTime head_position;
   GstClockTime tail_position;
   GstClockTime head_time;
   GstClockTime tail_time;
   GstClockTime time_level;
+  GstSegment head_segment;      /* segment before the queue */
 
   gboolean eos;
 
@@ -238,7 +240,7 @@ gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
   aggpad->priv->flow_return = GST_FLOW_OK;
   GST_OBJECT_LOCK (aggpad);
   gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
-  gst_segment_init (&aggpad->clip_segment, GST_FORMAT_UNDEFINED);
+  gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
   GST_OBJECT_UNLOCK (aggpad);
   aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
   aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
@@ -418,7 +420,8 @@ no_iter:
 static gboolean
 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
 {
-  return (g_queue_peek_tail (&pad->priv->buffers) == NULL);
+  return (g_queue_peek_tail (&pad->priv->buffers) == NULL &&
+      pad->priv->clipped_buffer == NULL);
 }
 
 static gboolean
@@ -751,7 +754,8 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
       pad->priv->pending_eos = FALSE;
       pad->priv->eos = TRUE;
     }
-    if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
+    if (pad->priv->clipped_buffer == NULL &&
+        GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
       event = g_queue_pop_tail (&pad->priv->buffers);
       PAD_BROADCAST_EVENT (pad);
     }
@@ -799,6 +803,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
     item = next;
   }
   aggpad->priv->num_buffers = 0;
+  gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
 
   PAD_BROADCAST_EVENT (aggpad);
   PAD_UNLOCK (aggpad);
@@ -1028,9 +1033,9 @@ update_time_level (GstAggregatorPad * aggpad, gboolean head)
 {
   if (head) {
     if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
-        aggpad->clip_segment.format == GST_FORMAT_TIME)
+        aggpad->priv->head_segment.format == GST_FORMAT_TIME)
       aggpad->priv->head_time =
-          gst_segment_to_running_time (&aggpad->clip_segment,
+          gst_segment_to_running_time (&aggpad->priv->head_segment,
           GST_FORMAT_TIME, aggpad->priv->head_position);
     else
       aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
@@ -2091,7 +2096,7 @@ static gboolean
 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
 {
   /* Empty queue always has space */
-  if (aggpad->priv->num_buffers == 0)
+  if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
     return TRUE;
 
   /* We also want at least two buffers, one is being processed and one is ready
@@ -2141,7 +2146,6 @@ static GstFlowReturn
 gst_aggregator_pad_chain_internal (GstAggregator * self,
     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
 {
-  GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
   GstFlowReturn flow_return;
   GstClockTime buf_pts;
 
@@ -2159,15 +2163,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
 
   PAD_UNLOCK (aggpad);
 
-  if (aggclass->clip && head) {
-    buffer = aggclass->clip (self, aggpad, buffer);
-  }
-
-  if (buffer == NULL) {
-    GST_LOG_OBJECT (aggpad, "Buffer dropped by clip function");
-    goto done;
-  }
-
   buf_pts = GST_BUFFER_PTS (buffer);
 
   aggpad->priv->first_buffer = FALSE;
@@ -2213,13 +2208,13 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
         break;
       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
         GST_OBJECT_LOCK (aggpad);
-        if (aggpad->segment.format == GST_FORMAT_TIME) {
+        if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
           start_time = buf_pts;
           if (start_time != -1) {
-            start_time = MAX (start_time, aggpad->segment.start);
+            start_time = MAX (start_time, aggpad->priv->head_segment.start);
             start_time =
-                gst_segment_to_running_time (&aggpad->segment, GST_FORMAT_TIME,
-                start_time);
+                gst_segment_to_running_time (&aggpad->priv->head_segment,
+                GST_FORMAT_TIME, start_time);
           }
         } else {
           start_time = 0;
@@ -2252,8 +2247,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
   GST_OBJECT_UNLOCK (self);
   SRC_UNLOCK (self);
 
-done:
-
   PAD_FLUSH_UNLOCK (aggpad);
 
   GST_DEBUG_OBJECT (aggpad, "Done chaining");
@@ -2342,8 +2335,8 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
 
     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
       GST_OBJECT_LOCK (aggpad);
-      gst_event_copy_segment (event, &aggpad->clip_segment);
-      aggpad->priv->head_position = aggpad->clip_segment.position;
+      gst_event_copy_segment (event, &aggpad->priv->head_segment);
+      aggpad->priv->head_position = aggpad->priv->head_segment.position;
       update_time_level (aggpad, TRUE);
       GST_OBJECT_UNLOCK (aggpad);
     }
@@ -2476,6 +2469,64 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
   gst_aggregator_pad_reset_unlocked (pad);
 }
 
+/* Must be called with the PAD_LOCK held */
+static void
+gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
+{
+  pad->priv->num_buffers--;
+  GST_TRACE_OBJECT (pad, "Consuming buffer");
+  if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
+    pad->priv->pending_eos = FALSE;
+    pad->priv->eos = TRUE;
+  }
+  PAD_BROADCAST_EVENT (pad);
+}
+
+/* Must be called with the PAD_LOCK held */
+static void
+gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
+{
+  GstAggregator *self = NULL;
+  GstAggregatorClass *aggclass;
+  GstBuffer *buffer = NULL;
+
+  while (pad->priv->clipped_buffer == NULL &&
+      GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
+    buffer = g_queue_pop_tail (&pad->priv->buffers);
+
+    apply_buffer (pad, buffer, FALSE);
+
+    /* We only take the parent here so that it's not taken if the buffer is
+     * already clipped or if the queue is empty.
+     */
+    if (self == NULL) {
+      self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
+      if (self == NULL) {
+        gst_buffer_unref (buffer);
+        return;
+      }
+
+      aggclass = GST_AGGREGATOR_GET_CLASS (self);
+    }
+
+    if (aggclass->clip) {
+      GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
+
+      buffer = aggclass->clip (self, pad, buffer);
+
+      if (buffer == NULL) {
+        gst_aggregator_pad_buffer_consumed (pad);
+        GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
+      }
+    }
+
+    pad->priv->clipped_buffer = buffer;
+  }
+
+  if (self)
+    gst_object_unref (self);
+}
+
 /**
  * gst_aggregator_pad_steal_buffer:
  * @pad: the pad to get buffer from
@@ -2488,23 +2539,20 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
 GstBuffer *
 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
 {
-  GstBuffer *buffer = NULL;
+  GstBuffer *buffer;
 
   PAD_LOCK (pad);
-  if (GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers)))
-    buffer = g_queue_pop_tail (&pad->priv->buffers);
+
+  gst_aggregator_pad_clip_buffer_unlocked (pad);
+
+  buffer = pad->priv->clipped_buffer;
+  pad->priv->clipped_buffer = NULL;
 
   if (buffer) {
-    apply_buffer (pad, buffer, FALSE);
-    pad->priv->num_buffers--;
-    GST_TRACE_OBJECT (pad, "Consuming buffer");
-    if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) {
-      pad->priv->pending_eos = FALSE;
-      pad->priv->eos = TRUE;
-    }
-    PAD_BROADCAST_EVENT (pad);
+    gst_aggregator_pad_buffer_consumed (pad);
     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
   }
+
   PAD_UNLOCK (pad);
 
   return buffer;
@@ -2543,17 +2591,17 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
 GstBuffer *
 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
 {
-  GstBuffer *buffer = NULL;
+  GstBuffer *buffer;
 
   PAD_LOCK (pad);
-  buffer = g_queue_peek_tail (&pad->priv->buffers);
-  /* The tail should always be a buffer, because if it is an event,
-   * it will be consumed immeditaly in gst_aggregator_steal_buffer */
 
-  if (GST_IS_BUFFER (buffer))
-    gst_buffer_ref (buffer);
-  else
+  gst_aggregator_pad_clip_buffer_unlocked (pad);
+
+  if (pad->priv->clipped_buffer) {
+    buffer = gst_buffer_ref (pad->priv->clipped_buffer);
+  } else {
     buffer = NULL;
+  }
   PAD_UNLOCK (pad);
 
   return buffer;
index e6238b7..38d2605 100644 (file)
@@ -71,8 +71,6 @@ struct _GstAggregatorPad
 
   /* Protected by the OBJECT_LOCK */
   GstSegment segment;
-  /* Segment to use in the clip function, before the queue */
-  GstSegment clip_segment;
 
   /* < Private > */
   GstAggregatorPadPrivate   *  priv;