aggregator: Queue "latency" buffers at each sink pad.
authorOlivier Crête <olivier.crete@collabora.com>
Sat, 7 Mar 2015 00:50:08 +0000 (19:50 -0500)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:26 +0000 (15:10 +0000)
In the case where you have a source giving the GstAggregator smaller
buffers than it uses, when it reaches a timeout, it will consume the
first buffer, then try to read another buffer for the pad. If the
previous element is not fast enough, it may get the next buffer even
though it may be queued just before. To prevent that race, the easiest
solution is to move the queue inside the GstAggregatorPad itself. It
also means that there is no need for strange code cause by increasing
the min latency without increasing the max latency proportionally.

This also means queuing the synchronized events and possibly acting
on them on the src task.

https://bugzilla.gnome.org/show_bug.cgi?id=745768

libs/gst/base/gstaggregator.c
libs/gst/base/gstaggregator.h

index 6cb73c0..a0a8776 100644 (file)
@@ -215,7 +215,13 @@ struct _GstAggregatorPadPrivate
   gboolean pending_flush_stop;
   gboolean pending_eos;
 
-  GstBuffer *buffer;
+  GQueue buffers;
+  GstClockTime head_position;
+  GstClockTime tail_position;
+  GstClockTime head_time;
+  GstClockTime tail_time;
+  GstClockTime time_level;
+
   gboolean eos;
 
   GMutex lock;
@@ -235,6 +241,15 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
   aggpad->priv->pending_eos = FALSE;
   aggpad->priv->eos = FALSE;
   aggpad->priv->flow_return = GST_FLOW_OK;
+  GST_OBJECT_LOCK (aggpad);
+  gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
+  gst_segment_init (&aggpad->clip_segment, GST_FORMAT_UNDEFINED);
+  GST_OBJECT_UNLOCK (aggpad);
+  aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
+  aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
+  aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
+  aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
+  aggpad->priv->time_level = 0;
   PAD_UNLOCK (aggpad);
 
   if (klass->flush)
@@ -287,7 +302,7 @@ struct _GstAggregatorPrivate
   GstClockTime start_time;
 
   /* properties */
-  gint64 latency;
+  gint64 latency;               /* protected by both src_lock and all pad locks */
 };
 
 typedef struct
@@ -312,6 +327,9 @@ enum
   PROP_LAST
 };
 
+static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
+    GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
+
 /**
  * gst_aggregator_iterate_sinkpads:
  * @self: The #GstAggregator
@@ -392,6 +410,12 @@ no_iter:
 }
 
 static gboolean
+gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
+{
+  return (g_queue_peek_tail (&pad->priv->buffers) == NULL);
+}
+
+static gboolean
 gst_aggregator_check_pads_ready (GstAggregator * self)
 {
   GstAggregatorPad *pad;
@@ -414,10 +438,11 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
      * generate a start time from it. In non-live mode all pads need
      * to have a buffer
      */
-    if (self->priv->peer_latency_live && pad->priv->buffer)
+    if (self->priv->peer_latency_live &&
+        !gst_aggregator_pad_queue_is_empty (pad))
       self->priv->first_buffer = FALSE;
 
-    if (pad->priv->buffer == NULL && !pad->priv->eos) {
+    if (gst_aggregator_pad_queue_is_empty (pad) && !pad->priv->eos) {
       PAD_UNLOCK (pad);
       goto pad_not_ready;
     }
@@ -690,16 +715,69 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
   return res;
 }
 
+static gboolean
+check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
+{
+  GstEvent *event = NULL;
+  GstAggregatorClass *klass = NULL;
+  gboolean *processed_event = user_data;
+
+  do {
+    event = NULL;
+
+    PAD_LOCK (pad);
+    if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
+      pad->priv->pending_eos = FALSE;
+      pad->priv->eos = TRUE;
+    }
+    if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
+      event = g_queue_pop_tail (&pad->priv->buffers);
+      PAD_BROADCAST_EVENT (pad);
+    }
+    PAD_UNLOCK (pad);
+    if (event) {
+      if (processed_event)
+        *processed_event = TRUE;
+      if (klass == NULL)
+        klass = GST_AGGREGATOR_GET_CLASS (self);
+
+      GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
+      klass->sink_event (self, pad, event);
+    }
+  } while (event != NULL);
+
+  return TRUE;
+}
+
 static void
 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
-    GstFlowReturn flow_return)
+    GstFlowReturn flow_return, gboolean full)
 {
+  GList *item;
+
   PAD_LOCK (aggpad);
   if (flow_return == GST_FLOW_NOT_LINKED)
     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
   else
     aggpad->priv->flow_return = flow_return;
-  gst_buffer_replace (&aggpad->priv->buffer, NULL);
+
+  item = g_queue_peek_head_link (&aggpad->priv->buffers);
+  while (item) {
+    GList *next = item->next;
+
+    /* In partial flush, we do like the pad, we get rid of non-sticky events
+     * and EOS/SEGMENT.
+     */
+    if (full || GST_IS_BUFFER (item->data) ||
+        GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
+        GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
+        !GST_EVENT_IS_STICKY (item->data)) {
+      gst_mini_object_unref (item->data);
+      g_queue_delete_link (&aggpad->priv->buffers, item);
+    }
+    item = next;
+  }
+
   PAD_BROADCAST_EVENT (aggpad);
   PAD_UNLOCK (aggpad);
 }
@@ -719,10 +797,17 @@ gst_aggregator_aggregate_func (GstAggregator * self)
   GST_LOG_OBJECT (self, "Checking aggregate");
   while (priv->send_eos && priv->running) {
     GstFlowReturn flow_return;
+    gboolean processed_event = FALSE;
+
+    gst_aggregator_iterate_sinkpads (self, check_events, NULL);
 
     if (!gst_aggregator_wait_and_check (self, &timeout))
       continue;
 
+    gst_aggregator_iterate_sinkpads (self, check_events, &processed_event);
+    if (processed_event)
+      continue;
+
     GST_TRACE_OBJECT (self, "Actually aggregating!");
     flow_return = klass->aggregate (self, timeout);
 
@@ -748,7 +833,7 @@ gst_aggregator_aggregate_func (GstAggregator * self)
       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
 
-        gst_aggregator_pad_set_flushing (aggpad, flow_return);
+        gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
       }
       GST_OBJECT_UNLOCK (self);
       break;
@@ -879,7 +964,7 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
   GstAggregatorPrivate *priv = self->priv;
   GstAggregatorPadPrivate *padpriv = aggpad->priv;
 
-  gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
+  gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
 
   PAD_FLUSH_LOCK (aggpad);
   PAD_LOCK (aggpad);
@@ -914,10 +999,44 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
     gst_event_unref (event);
   }
   PAD_FLUSH_UNLOCK (aggpad);
+}
+
+/* Must be called with the the PAD_LOCK held */
+static void
+update_time_level (GstAggregatorPad * aggpad, gboolean head)
+{
+  if (head) {
+    if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
+        aggpad->clip_segment.format == GST_FORMAT_TIME)
+      aggpad->priv->head_time =
+          gst_segment_to_running_time (&aggpad->clip_segment,
+          GST_FORMAT_TIME, aggpad->priv->head_position);
+    else
+      aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
+  } else {
+    if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
+        aggpad->segment.format == GST_FORMAT_TIME)
+      aggpad->priv->tail_time =
+          gst_segment_to_running_time (&aggpad->segment,
+          GST_FORMAT_TIME, aggpad->priv->tail_position);
+    else
+      aggpad->priv->tail_time = aggpad->priv->head_time;
+  }
+
+  if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE ||
+      aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) {
+    aggpad->priv->time_level = 0;
+    return;
+  }
 
-  gst_aggregator_pad_drop_buffer (aggpad);
+  if (aggpad->priv->tail_time > aggpad->priv->head_time)
+    aggpad->priv->time_level = 0;
+  else
+    aggpad->priv->time_level = aggpad->priv->head_time -
+        aggpad->priv->tail_time;
 }
 
+
 /* GstAggregator vmethods default implementations */
 static gboolean
 gst_aggregator_default_sink_event (GstAggregator * self,
@@ -978,7 +1097,7 @@ gst_aggregator_default_sink_event (GstAggregator * self,
        */
       SRC_LOCK (self);
       PAD_LOCK (aggpad);
-      if (!aggpad->priv->buffer) {
+      if (gst_aggregator_pad_queue_is_empty (aggpad)) {
         aggpad->priv->eos = TRUE;
       } else {
         aggpad->priv->pending_eos = TRUE;
@@ -991,9 +1110,12 @@ gst_aggregator_default_sink_event (GstAggregator * self,
     }
     case GST_EVENT_SEGMENT:
     {
+      PAD_LOCK (aggpad);
       GST_OBJECT_LOCK (aggpad);
       gst_event_copy_segment (event, &aggpad->segment);
+      update_time_level (aggpad, FALSE);
       GST_OBJECT_UNLOCK (aggpad);
+      PAD_UNLOCK (aggpad);
 
       GST_OBJECT_LOCK (self);
       self->priv->seqnum = gst_event_get_seqnum (event);
@@ -1006,19 +1128,40 @@ gst_aggregator_default_sink_event (GstAggregator * self,
     }
     case GST_EVENT_GAP:
     {
-      GstClockTime pts;
+      GstClockTime pts, endpts;
       GstClockTime duration;
       GstBuffer *gapbuf;
 
       gst_event_parse_gap (event, &pts, &duration);
       gapbuf = gst_buffer_new ();
 
+      if (GST_CLOCK_TIME_IS_VALID (duration))
+        endpts = pts + duration;
+      else
+        endpts = GST_CLOCK_TIME_NONE;
+
+      GST_OBJECT_LOCK (aggpad);
+      res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
+          &pts, &endpts);
+      GST_OBJECT_UNLOCK (aggpad);
+
+      if (!res) {
+        GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
+        goto eat;
+      }
+
+      if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
+        duration = endpts - pts;
+      else
+        duration = GST_CLOCK_TIME_NONE;
+
       GST_BUFFER_PTS (gapbuf) = pts;
       GST_BUFFER_DURATION (gapbuf) = duration;
       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
 
-      if (gst_pad_chain (pad, gapbuf) != GST_FLOW_OK) {
+      if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
+          GST_FLOW_OK) {
         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
         res = FALSE;
       }
@@ -1150,7 +1293,7 @@ gst_aggregator_release_pad (GstElement * element, GstPad * pad)
   GST_INFO_OBJECT (pad, "Removing pad");
 
   SRC_LOCK (self);
-  gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
+  gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
   gst_element_remove_pad (element, pad);
 
   self->priv->has_peer_latency = FALSE;
@@ -1252,7 +1395,7 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
   min += self->priv->sub_latency_min;
   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
       && GST_CLOCK_TIME_IS_VALID (max))
-    max += self->priv->sub_latency_max;
+    max += self->priv->sub_latency_max + our_latency;
   else
     max = GST_CLOCK_TIME_NONE;
 
@@ -1660,41 +1803,36 @@ static void
 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
 {
   gboolean changed;
-  GstClockTime min, max;
 
   g_return_if_fail (GST_IS_AGGREGATOR (self));
   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
 
   SRC_LOCK (self);
-  if (self->priv->peer_latency_live) {
-    min = self->priv->peer_latency_min;
-    max = self->priv->peer_latency_max;
-    /* add our own */
-    min += latency;
-    min += self->priv->sub_latency_min;
-    if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
-        && GST_CLOCK_TIME_IS_VALID (max))
-      max += self->priv->sub_latency_max;
-    else
-      max = GST_CLOCK_TIME_NONE;
-
-    if (GST_CLOCK_TIME_IS_VALID (max) && min > max) {
-      GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
-          ("%s", "Latency too big"),
-          ("The requested latency value is too big for the latency in the "
-              "current pipeline.  Limiting to %" G_GINT64_FORMAT, max));
-      /* FIXME: This could in theory become negative, but in
-       * that case all is lost anyway */
-      latency -= min - max;
-      /* FIXME: shouldn't we g_object_notify() the change here? */
+  changed = (self->priv->latency != latency);
+
+  if (changed) {
+    GList *item;
+
+    GST_OBJECT_LOCK (self);
+    /* First lock all the pads */
+    for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
+      GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
+      PAD_LOCK (aggpad);
     }
-  }
 
-  changed = (self->priv->latency != latency);
-  self->priv->latency = latency;
+    self->priv->latency = latency;
 
-  if (changed)
     SRC_BROADCAST (self);
+
+    /* Now wake up the pads */
+    for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
+      GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
+      PAD_BROADCAST_EVENT (aggpad);
+      PAD_UNLOCK (aggpad);
+    }
+    GST_OBJECT_UNLOCK (self);
+  }
+
   SRC_UNLOCK (self);
 
   if (changed)
@@ -1902,14 +2040,60 @@ gst_aggregator_get_type (void)
   return type;
 }
 
+/* Must be called with PAD lock held */
+static gboolean
+gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
+{
+  /* Empty queue always has space */
+  if (g_queue_get_length (&aggpad->priv->buffers) == 0)
+    return TRUE;
+
+  /* zero latency, if there is a buffer, it's full */
+  if (self->priv->latency == 0)
+    return FALSE;
+
+  /* Allow no more buffers than the latency */
+  return (aggpad->priv->time_level <= self->priv->latency);
+}
+
+/* Must be called with the PAD_LOCK held */
+static void
+apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
+{
+  GstClockTime timestamp;
+
+  if (GST_BUFFER_DTS_IS_VALID (buffer))
+    timestamp = GST_BUFFER_DTS (buffer);
+  else
+    timestamp = GST_BUFFER_PTS (buffer);
+
+  if (timestamp == GST_CLOCK_TIME_NONE) {
+    if (head)
+      timestamp = aggpad->priv->head_position;
+    else
+      timestamp = aggpad->priv->tail_position;
+  }
+
+  /* add duration */
+  if (GST_BUFFER_DURATION_IS_VALID (buffer))
+    timestamp += GST_BUFFER_DURATION (buffer);
+
+  if (head)
+    aggpad->priv->head_position = timestamp;
+  else
+    aggpad->priv->tail_position = timestamp;
+
+  update_time_level (aggpad, head);
+}
+
 static GstFlowReturn
-gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
+gst_aggregator_pad_chain_internal (GstAggregator * self,
+    GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
 {
   GstBuffer *actual_buf = buffer;
-  GstAggregator *self = GST_AGGREGATOR (object);
-  GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
-  GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
+  GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
   GstFlowReturn flow_return;
+  GstClockTime buf_pts;
 
   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
 
@@ -1923,26 +2107,49 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   if (aggpad->priv->pending_eos == TRUE)
     goto eos;
 
-  while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
-    PAD_WAIT_EVENT (aggpad);
-
   flow_return = aggpad->priv->flow_return;
   if (flow_return != GST_FLOW_OK)
     goto flushing;
 
   PAD_UNLOCK (aggpad);
 
-  if (aggclass->clip) {
+  if (aggclass->clip && head) {
     aggclass->clip (self, aggpad, buffer, &actual_buf);
   }
 
-  SRC_LOCK (self);
-  PAD_LOCK (aggpad);
-  if (aggpad->priv->buffer)
-    gst_buffer_unref (aggpad->priv->buffer);
-  aggpad->priv->buffer = actual_buf;
+  if (actual_buf == NULL) {
+    GST_LOG_OBJECT (actual_buf, "Buffer dropped by clip function");
+    goto done;
+  }
 
-  flow_return = aggpad->priv->flow_return;
+  buf_pts = GST_BUFFER_PTS (actual_buf);
+
+  for (;;) {
+    SRC_LOCK (self);
+    PAD_LOCK (aggpad);
+    if (gst_aggregator_pad_has_space (self, aggpad)
+        && aggpad->priv->flow_return == GST_FLOW_OK) {
+      if (head)
+        g_queue_push_head (&aggpad->priv->buffers, actual_buf);
+      else
+        g_queue_push_tail (&aggpad->priv->buffers, actual_buf);
+      apply_buffer (aggpad, actual_buf, head);
+      actual_buf = buffer = NULL;
+      SRC_BROADCAST (self);
+      break;
+    }
+
+    flow_return = aggpad->priv->flow_return;
+    if (flow_return != GST_FLOW_OK) {
+      SRC_UNLOCK (self);
+      goto flushing;
+    }
+    GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
+    SRC_UNLOCK (self);
+    PAD_WAIT_EVENT (aggpad);
+
+    PAD_UNLOCK (aggpad);
+  }
 
   if (self->priv->first_buffer) {
     GstClockTime start_time;
@@ -1954,7 +2161,7 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
         break;
       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
         if (aggpad->segment.format == GST_FORMAT_TIME) {
-          start_time = GST_BUFFER_PTS (actual_buf);
+          start_time = buf_pts;
           if (start_time != -1) {
             start_time = MAX (start_time, aggpad->segment.start);
             start_time =
@@ -1990,11 +2197,12 @@ gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
   }
 
   PAD_UNLOCK (aggpad);
-  PAD_FLUSH_UNLOCK (aggpad);
-
-  SRC_BROADCAST (self);
   SRC_UNLOCK (self);
 
+done:
+
+  PAD_FLUSH_UNLOCK (aggpad);
+
   GST_DEBUG_OBJECT (aggpad, "Done chaining");
 
   return flow_return;
@@ -2003,9 +2211,10 @@ flushing:
   PAD_UNLOCK (aggpad);
   PAD_FLUSH_UNLOCK (aggpad);
 
-  gst_buffer_unref (buffer);
   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
       gst_flow_get_name (flow_return));
+  if (buffer)
+    gst_buffer_unref (buffer);
 
   return flow_return;
 
@@ -2014,11 +2223,18 @@ eos:
   PAD_FLUSH_UNLOCK (aggpad);
 
   gst_buffer_unref (buffer);
-  GST_DEBUG_OBJECT (pad, "We are EOS already...");
+  GST_DEBUG_OBJECT (aggpad, "We are EOS already...");
 
   return GST_FLOW_EOS;
 }
 
+static GstFlowReturn
+gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
+{
+  return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
+      GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE);
+}
+
 static gboolean
 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
     GstQuery * query)
@@ -2029,8 +2245,11 @@ gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
   if (GST_QUERY_IS_SERIALIZED (query)) {
     PAD_LOCK (aggpad);
 
-    while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
+    while (!gst_aggregator_pad_queue_is_empty (aggpad)
+        && aggpad->priv->flow_return == GST_FLOW_OK) {
+      GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
       PAD_WAIT_EVENT (aggpad);
+    }
 
     if (aggpad->priv->flow_return != GST_FLOW_OK)
       goto flushing;
@@ -2052,31 +2271,49 @@ static gboolean
 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
     GstEvent * event)
 {
+  GstAggregator *self = GST_AGGREGATOR (parent);
   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
 
   if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
-      && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
+      /* && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE */ ) {
+    SRC_LOCK (self);
     PAD_LOCK (aggpad);
 
-
-    while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
-      PAD_WAIT_EVENT (aggpad);
-
     if (aggpad->priv->flow_return != GST_FLOW_OK
         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
       goto flushing;
 
+    if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
+      GST_OBJECT_LOCK (aggpad);
+      gst_event_copy_segment (event, &aggpad->clip_segment);
+      aggpad->priv->head_position = aggpad->clip_segment.position;
+      update_time_level (aggpad, TRUE);
+      GST_OBJECT_UNLOCK (aggpad);
+    }
+
+    if (!gst_aggregator_pad_queue_is_empty (aggpad) &&
+        GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
+      GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT,
+          event);
+      g_queue_push_head (&aggpad->priv->buffers, event);
+      event = NULL;
+      SRC_BROADCAST (self);
+    }
     PAD_UNLOCK (aggpad);
+    SRC_UNLOCK (self);
   }
 
-  return klass->sink_event (GST_AGGREGATOR (parent),
-      GST_AGGREGATOR_PAD (pad), event);
+  if (event)
+    return klass->sink_event (self, aggpad, event);
+  else
+    return TRUE;
 
 flushing:
   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
       gst_flow_get_name (aggpad->priv->flow_return));
   PAD_UNLOCK (aggpad);
+  SRC_UNLOCK (self);
   if (GST_EVENT_IS_STICKY (event))
     gst_pad_store_sticky_event (pad, event);
   gst_event_unref (event);
@@ -2087,10 +2324,14 @@ static gboolean
 gst_aggregator_pad_activate_mode_func (GstPad * pad,
     GstObject * parent, GstPadMode mode, gboolean active)
 {
+  GstAggregator *self = GST_AGGREGATOR (parent);
   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
 
   if (active == FALSE) {
-    gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
+    SRC_LOCK (self);
+    gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
+    SRC_BROADCAST (self);
+    SRC_UNLOCK (self);
   } else {
     PAD_LOCK (aggpad);
     aggpad->priv->flow_return = GST_FLOW_OK;
@@ -2138,7 +2379,7 @@ gst_aggregator_pad_dispose (GObject * object)
 {
   GstAggregatorPad *pad = (GstAggregatorPad *) object;
 
-  gst_aggregator_pad_drop_buffer (pad);
+  gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
 
   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
 }
@@ -2162,7 +2403,7 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
       GstAggregatorPadPrivate);
 
-  pad->priv->buffer = NULL;
+  g_queue_init (&pad->priv->buffers);
   g_cond_init (&pad->priv->event_cond);
 
   g_mutex_init (&pad->priv->flush_lock);
@@ -2184,11 +2425,13 @@ gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
   GstBuffer *buffer = NULL;
 
   PAD_LOCK (pad);
-  if (pad->priv->buffer) {
+  if (GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers)))
+    buffer = g_queue_pop_tail (&pad->priv->buffers);
+
+  if (buffer) {
+    apply_buffer (pad, buffer, FALSE);
     GST_TRACE_OBJECT (pad, "Consuming buffer");
-    buffer = pad->priv->buffer;
-    pad->priv->buffer = NULL;
-    if (pad->priv->pending_eos) {
+    if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
       pad->priv->pending_eos = FALSE;
       pad->priv->eos = TRUE;
     }
@@ -2236,8 +2479,14 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
   GstBuffer *buffer = NULL;
 
   PAD_LOCK (pad);
-  if (pad->priv->buffer)
-    buffer = gst_buffer_ref (pad->priv->buffer);
+  buffer = g_queue_peek_tail (&pad->priv->buffers);
+  /* The tail should always be a buffer, because if it is an event,
+   * it will be consumed immeditaly in gst_aggregator_steal_buffer */
+
+  if (GST_IS_BUFFER (buffer))
+    gst_buffer_ref (buffer);
+  else
+    buffer = NULL;
   PAD_UNLOCK (pad);
 
   return buffer;
index cdfa9c2..9401d0c 100644 (file)
@@ -71,6 +71,8 @@ struct _GstAggregatorPad
 
   /* Protected by the OBJECT_LOCK */
   GstSegment segment;
+  /* Segment to use in the clip function, before the queue */
+  GstSegment clip_segment;
 
   /* < Private > */
   GstAggregatorPadPrivate   *  priv;