aggregator: Always handle sync'ed events on output thread
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
index 9a688dc..f38d92a 100644 (file)
@@ -212,12 +212,14 @@ struct _GstAggregatorPadPrivate
   gboolean first_buffer;
 
   GQueue buffers;
+  GstBuffer *clipped_buffer;
   guint num_buffers;
   GstClockTime head_position;
   GstClockTime tail_position;
   GstClockTime head_time;
   GstClockTime tail_time;
   GstClockTime time_level;
+  GstSegment head_segment;      /* segment before the queue */
 
   gboolean eos;
 
@@ -238,13 +240,14 @@ gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
   aggpad->priv->flow_return = GST_FLOW_OK;
   GST_OBJECT_LOCK (aggpad);
   gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
-  gst_segment_init (&aggpad->clip_segment, GST_FORMAT_UNDEFINED);
+  gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
   GST_OBJECT_UNLOCK (aggpad);
   aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
   aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
   aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
   aggpad->priv->time_level = 0;
+  aggpad->priv->first_buffer = TRUE;
 }
 
 static gboolean
@@ -417,7 +420,8 @@ no_iter:
 static gboolean
 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
 {
-  return (g_queue_peek_tail (&pad->priv->buffers) == NULL);
+  return (g_queue_peek_tail (&pad->priv->buffers) == NULL &&
+      pad->priv->clipped_buffer == NULL);
 }
 
 static gboolean
@@ -425,7 +429,8 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
 {
   GstAggregatorPad *pad;
   GList *l, *sinkpads;
-  gboolean have_data = TRUE;
+  gboolean have_buffer = TRUE;
+  gboolean have_event = FALSE;
 
   GST_LOG_OBJECT (self, "checking pads");
 
@@ -440,9 +445,11 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
 
     PAD_LOCK (pad);
 
-    if (gst_aggregator_pad_queue_is_empty (pad)) {
+    if (pad->priv->num_buffers == 0) {
+      if (!gst_aggregator_pad_queue_is_empty (pad))
+        have_event = TRUE;
       if (!pad->priv->eos) {
-        have_data = FALSE;
+        have_buffer = FALSE;
 
         /* If not live we need data on all pads, so leave the loop */
         if (!self->priv->peer_latency_live) {
@@ -461,10 +468,11 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
     PAD_UNLOCK (pad);
   }
 
-  if (!have_data)
+  if (!have_buffer && !have_event)
     goto pad_not_ready;
 
-  self->priv->first_buffer = FALSE;
+  if (have_buffer)
+    self->priv->first_buffer = FALSE;
 
   GST_OBJECT_UNLOCK (self);
   GST_LOG_OBJECT (self, "pads are ready");
@@ -478,9 +486,13 @@ no_sinkpads:
   }
 pad_not_ready:
   {
-    GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
+    if (have_event)
+      GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
+          " but waking up for serialized event");
+    else
+      GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
     GST_OBJECT_UNLOCK (self);
-    return FALSE;
+    return have_event;
   }
 }
 
@@ -738,11 +750,12 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
     event = NULL;
 
     PAD_LOCK (pad);
-    if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
+    if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) {
       pad->priv->pending_eos = FALSE;
       pad->priv->eos = TRUE;
     }
-    if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
+    if (pad->priv->clipped_buffer == NULL &&
+        GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
       event = g_queue_pop_tail (&pad->priv->buffers);
       PAD_BROADCAST_EVENT (pad);
     }
@@ -790,6 +803,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
     item = next;
   }
   aggpad->priv->num_buffers = 0;
+  gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
 
   PAD_BROADCAST_EVENT (aggpad);
   PAD_UNLOCK (aggpad);
@@ -1019,12 +1033,15 @@ update_time_level (GstAggregatorPad * aggpad, gboolean head)
 {
   if (head) {
     if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
-        aggpad->clip_segment.format == GST_FORMAT_TIME)
+        aggpad->priv->head_segment.format == GST_FORMAT_TIME)
       aggpad->priv->head_time =
-          gst_segment_to_running_time (&aggpad->clip_segment,
+          gst_segment_to_running_time (&aggpad->priv->head_segment,
           GST_FORMAT_TIME, aggpad->priv->head_position);
     else
       aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
+
+    if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time))
+      aggpad->priv->tail_time = aggpad->priv->head_time;
   } else {
     if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
         aggpad->segment.format == GST_FORMAT_TIME)
@@ -1096,8 +1113,6 @@ gst_aggregator_default_sink_event (GstAggregator * self,
         GST_OBJECT_UNLOCK (self);
       }
 
-      aggpad->priv->first_buffer = TRUE;
-
       /* We never forward the event */
       goto eat;
     }
@@ -1111,7 +1126,7 @@ gst_aggregator_default_sink_event (GstAggregator * self,
        */
       SRC_LOCK (self);
       PAD_LOCK (aggpad);
-      if (gst_aggregator_pad_queue_is_empty (aggpad)) {
+      if (aggpad->priv->num_buffers == 0) {
         aggpad->priv->eos = TRUE;
       } else {
         aggpad->priv->pending_eos = TRUE;
@@ -2081,7 +2096,7 @@ static gboolean
 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
 {
   /* Empty queue always has space */
-  if (g_queue_get_length (&aggpad->priv->buffers) == 0)
+  if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
     return TRUE;
 
   /* We also want at least two buffers, one is being processed and one is ready
@@ -2131,8 +2146,6 @@ static GstFlowReturn
 gst_aggregator_pad_chain_internal (GstAggregator * self,
     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
 {
-  GstBuffer *actual_buf = buffer;
-  GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
   GstFlowReturn flow_return;
   GstClockTime buf_pts;
 
@@ -2150,16 +2163,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
 
   PAD_UNLOCK (aggpad);
 
-  if (aggclass->clip && head) {
-    aggclass->clip (self, aggpad, buffer, &actual_buf);
-  }
-
-  if (actual_buf == NULL) {
-    GST_LOG_OBJECT (actual_buf, "Buffer dropped by clip function");
-    goto done;
-  }
-
-  buf_pts = GST_BUFFER_PTS (actual_buf);
+  buf_pts = GST_BUFFER_PTS (buffer);
 
   aggpad->priv->first_buffer = FALSE;
 
@@ -2170,12 +2174,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
     if (gst_aggregator_pad_has_space (self, aggpad)
         && aggpad->priv->flow_return == GST_FLOW_OK) {
       if (head)
-        g_queue_push_head (&aggpad->priv->buffers, actual_buf);
+        g_queue_push_head (&aggpad->priv->buffers, buffer);
       else
-        g_queue_push_tail (&aggpad->priv->buffers, actual_buf);
-      apply_buffer (aggpad, actual_buf, head);
+        g_queue_push_tail (&aggpad->priv->buffers, buffer);
+      apply_buffer (aggpad, buffer, head);
       aggpad->priv->num_buffers++;
-      actual_buf = buffer = NULL;
+      buffer = NULL;
       SRC_BROADCAST (self);
       break;
     }
@@ -2204,13 +2208,13 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
         break;
       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
         GST_OBJECT_LOCK (aggpad);
-        if (aggpad->segment.format == GST_FORMAT_TIME) {
+        if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
           start_time = buf_pts;
           if (start_time != -1) {
-            start_time = MAX (start_time, aggpad->segment.start);
+            start_time = MAX (start_time, aggpad->priv->head_segment.start);
             start_time =
-                gst_segment_to_running_time (&aggpad->segment, GST_FORMAT_TIME,
-                start_time);
+                gst_segment_to_running_time (&aggpad->priv->head_segment,
+                GST_FORMAT_TIME, start_time);
           }
         } else {
           start_time = 0;
@@ -2243,8 +2247,6 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
   GST_OBJECT_UNLOCK (self);
   SRC_UNLOCK (self);
 
-done:
-
   PAD_FLUSH_UNLOCK (aggpad);
 
   GST_DEBUG_OBJECT (aggpad, "Done chaining");
@@ -2333,14 +2335,13 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
 
     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
       GST_OBJECT_LOCK (aggpad);
-      gst_event_copy_segment (event, &aggpad->clip_segment);
-      aggpad->priv->head_position = aggpad->clip_segment.position;
+      gst_event_copy_segment (event, &aggpad->priv->head_segment);
+      aggpad->priv->head_position = aggpad->priv->head_segment.position;
       update_time_level (aggpad, TRUE);
       GST_OBJECT_UNLOCK (aggpad);
     }
 
-    if (!gst_aggregator_pad_queue_is_empty (aggpad) &&
-        GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
+    if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
       GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT,
           event);
       g_queue_push_head (&aggpad->priv->buffers, event);
@@ -2464,10 +2465,67 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
   g_mutex_init (&pad->priv->flush_lock);
   g_mutex_init (&pad->priv->lock);
 
-  pad->priv->first_buffer = TRUE;
   gst_aggregator_pad_reset_unlocked (pad);
 }
 
+/* Must be called with the PAD_LOCK held */
+static void
+gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
+{
+  pad->priv->num_buffers--;
+  GST_TRACE_OBJECT (pad, "Consuming buffer");
+  if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
+    pad->priv->pending_eos = FALSE;
+    pad->priv->eos = TRUE;
+  }
+  PAD_BROADCAST_EVENT (pad);
+}
+
+/* Must be called with the PAD_LOCK held */
+static void
+gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
+{
+  GstAggregator *self = NULL;
+  GstAggregatorClass *aggclass;
+  GstBuffer *buffer = NULL;
+
+  while (pad->priv->clipped_buffer == NULL &&
+      GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
+    buffer = g_queue_pop_tail (&pad->priv->buffers);
+
+    apply_buffer (pad, buffer, FALSE);
+
+    /* We only take the parent here so that it's not taken if the buffer is
+     * already clipped or if the queue is empty.
+     */
+    if (self == NULL) {
+      self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
+      if (self == NULL) {
+        gst_buffer_unref (buffer);
+        return;
+      }
+
+      aggclass = GST_AGGREGATOR_GET_CLASS (self);
+    }
+
+    if (aggclass->clip) {
+      GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
+
+      buffer = aggclass->clip (self, pad, buffer);
+
+      if (buffer == NULL) {
+        gst_aggregator_pad_buffer_consumed (pad);
+        GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
+      }
+    }
+
+    pad->priv->clipped_buffer = buffer;
+  }
+
+  if (self)
+    gst_object_unref (self);
+}
+
 /**
  * gst_aggregator_pad_steal_buffer:
  * @pad: the pad to get buffer from
@@ -2480,23 +2538,20 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
 GstBuffer *
 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
 {
-  GstBuffer *buffer = NULL;
+  GstBuffer *buffer;
 
   PAD_LOCK (pad);
-  if (GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers)))
-    buffer = g_queue_pop_tail (&pad->priv->buffers);
+
+  gst_aggregator_pad_clip_buffer_unlocked (pad);
+
+  buffer = pad->priv->clipped_buffer;
+  pad->priv->clipped_buffer = NULL;
 
   if (buffer) {
-    apply_buffer (pad, buffer, FALSE);
-    pad->priv->num_buffers--;
-    GST_TRACE_OBJECT (pad, "Consuming buffer");
-    if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
-      pad->priv->pending_eos = FALSE;
-      pad->priv->eos = TRUE;
-    }
-    PAD_BROADCAST_EVENT (pad);
+    gst_aggregator_pad_buffer_consumed (pad);
     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
   }
+
   PAD_UNLOCK (pad);
 
   return buffer;
@@ -2535,17 +2590,17 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
 GstBuffer *
 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
 {
-  GstBuffer *buffer = NULL;
+  GstBuffer *buffer;
 
   PAD_LOCK (pad);
-  buffer = g_queue_peek_tail (&pad->priv->buffers);
-  /* The tail should always be a buffer, because if it is an event,
-   * it will be consumed immeditaly in gst_aggregator_steal_buffer */
 
-  if (GST_IS_BUFFER (buffer))
-    gst_buffer_ref (buffer);
-  else
+  gst_aggregator_pad_clip_buffer_unlocked (pad);
+
+  if (pad->priv->clipped_buffer) {
+    buffer = gst_buffer_ref (pad->priv->clipped_buffer);
+  } else {
     buffer = NULL;
+  }
   PAD_UNLOCK (pad);
 
   return buffer;