aggregator: Assert if the sink/src pad type that is to be used is not a GstAggregator...
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
index 18eb6d9..94d4816 100644 (file)
@@ -22,8 +22,8 @@
 /**
  * SECTION: gstaggregator
  * @title: GstAggregator
- * @short_description: manages a set of pads with the purpose of
- * aggregating their buffers.
+ * @short_description: Base class for mixers and muxers, manages a set of input
+ *     pads and aggregates their streams
  * @see_also: gstcollectpads for historical reasons.
  *
  * Manages a set of pads with the purpose of aggregating their buffers.
  *  * Base class for mixers and muxers. Subclasses should at least implement
  *    the #GstAggregatorClass.aggregate() virtual method.
  *
- *  * When data is queued on all pads, tha aggregate vmethod is called.
+ *  * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a
+ *    #GstPadQueryFunction to queue all serialized data packets per sink pad.
+ *    Subclasses should not overwrite those, but instead implement
+ *    #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as
+ *    needed.
+ *
+ *  * When data is queued on all pads, the aggregate vmethod is called.
  *
  *  * One can peek at the data on any given GstAggregatorPad with the
- *    gst_aggregator_pad_get_buffer () method, and take ownership of it
- *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
- *    has been taken with steal_buffer (), a new buffer can be queued
+ *    gst_aggregator_pad_peek_buffer () method, and remove it from the pad
+ *    with the gst_aggregator_pad_pop_buffer () method. When a buffer
+ *    has been taken with pop_buffer (), a new buffer can be queued
  *    on that pad.
  *
  *  * If the subclass wishes to push a buffer downstream in its aggregate
  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
  *    to ease their identification and subsequent processing.
  *
+ *  * Subclasses must use (a subclass of) #GstAggregatorPad for both their
+ *    sink and source pads.
+ *    See gst_element_class_add_static_pad_template_with_gtype().
+ *
+ * This class used to live in gst-plugins-bad and was moved to core.
+ *
+ * Since: 1.14
+ */
+
+/**
+ * SECTION: gstaggregatorpad
+ * @title: GstAggregatorPad
+ * @short_description: #GstPad subclass for pads managed by #GstAggregator
+ * @see_also: gstcollectpads for historical reasons.
+ *
+ * Pads managed by a #GstAggregor subclass.
+ *
+ * This class used to live in gst-plugins-bad and was moved to core.
+ *
+ * Since: 1.14
  */
 
 #ifdef HAVE_CONFIG_H
@@ -94,12 +120,21 @@ gst_aggregator_start_time_selection_get_type (void)
 }
 
 /*  Might become API */
+#if 0
 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
     const GstTagList * tags, GstTagMergeMode mode);
+#endif
 static void gst_aggregator_set_latency_property (GstAggregator * agg,
-    gint64 latency);
-static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
+    GstClockTime latency);
+static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
+
+static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
+
+static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad,
+    GstBuffer * buffer);
 
+GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
+#define GST_CAT_DEFAULT aggregator_debug
 
 /* Locking order, locks in this element must always be taken in this order
  *
@@ -113,12 +148,6 @@ static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
  */
 
-
-static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
-
-GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
-#define GST_CAT_DEFAULT aggregator_debug
-
 /* GstAggregatorPad definitions */
 #define PAD_LOCK(pad)   G_STMT_START {                                  \
   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
@@ -207,20 +236,24 @@ struct _GstAggregatorPadPrivate
   GstFlowReturn flow_return;
   gboolean pending_flush_start;
   gboolean pending_flush_stop;
-  gboolean pending_eos;
 
   gboolean first_buffer;
 
-  GQueue buffers;
+  GQueue data;                  /* buffers, events and queries */
   GstBuffer *clipped_buffer;
   guint num_buffers;
+
+  /* used to track fill state of queues, only used with live-src and when
+   * latency property is set to > 0 */
   GstClockTime head_position;
   GstClockTime tail_position;
-  GstClockTime head_time;
+  GstClockTime head_time;       /* running time */
   GstClockTime tail_time;
-  GstClockTime time_level;
+  GstClockTime time_level;      /* how much head is ahead of tail */
   GstSegment head_segment;      /* segment before the queue */
 
+  gboolean negotiated;
+
   gboolean eos;
 
   GMutex lock;
@@ -229,13 +262,15 @@ struct _GstAggregatorPadPrivate
    * the chain function is also happening.
    */
   GMutex flush_lock;
+
+  /* properties */
+  gboolean emit_signals;
 };
 
 /* Must be called with PAD_LOCK held */
 static void
 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
 {
-  aggpad->priv->pending_eos = FALSE;
   aggpad->priv->eos = FALSE;
   aggpad->priv->flow_return = GST_FLOW_OK;
   GST_OBJECT_LOCK (aggpad);
@@ -260,7 +295,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
   PAD_UNLOCK (aggpad);
 
   if (klass->flush)
-    return klass->flush (aggpad, agg);
+    return (klass->flush (aggpad, agg) == GST_FLOW_OK);
 
   return TRUE;
 }
@@ -269,6 +304,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
  * GstAggregator implementation  *
  *************************************/
 static GstElementClass *aggregator_parent_class = NULL;
+static gint aggregator_private_offset = 0;
 
 /* All members are protected by the object lock unless otherwise noted */
 
@@ -279,6 +315,8 @@ struct _GstAggregatorPrivate
   /* Our state is >= PAUSED */
   gboolean running;             /* protected by src_lock */
 
+  /* seqnum from seek or segment,
+   * to be applied to synthetic segment/eos events */
   gint seqnum;
   gboolean send_stream_start;   /* protected by srcpad stream lock */
   gboolean send_segment;
@@ -299,6 +337,8 @@ struct _GstAggregatorPrivate
   GstClockTime sub_latency_min; /* protected by src_lock */
   GstClockTime sub_latency_max; /* protected by src_lock */
 
+  GstClockTime upstream_latency_min;    /* protected by src_lock */
+
   /* aggregate */
   GstClockID aggregate_id;      /* protected by src_lock */
   GMutex src_lock;
@@ -318,17 +358,21 @@ struct _GstAggregatorPrivate
   gint64 latency;               /* protected by both src_lock and all pad locks */
 };
 
+/* Seek event forwarding helper */
 typedef struct
 {
+  /* parameters */
   GstEvent *event;
-  gboolean result;
   gboolean flush;
   gboolean only_to_active_pads;
 
+  /* results */
+  gboolean result;
   gboolean one_actually_seeked;
 } EventData;
 
 #define DEFAULT_LATENCY              0
+#define DEFAULT_MIN_UPSTREAM_LATENCY              0
 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
 #define DEFAULT_START_TIME           (-1)
 
@@ -336,6 +380,7 @@ enum
 {
   PROP_0,
   PROP_LATENCY,
+  PROP_MIN_UPSTREAM_LATENCY,
   PROP_START_TIME_SELECTION,
   PROP_START_TIME,
   PROP_LAST
@@ -344,99 +389,20 @@ enum
 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
 
-/**
- * gst_aggregator_iterate_sinkpads:
- * @self: The #GstAggregator
- * @func: (scope call): The function to call.
- * @user_data: (closure): The data to pass to @func.
- *
- * Iterate the sinkpads of aggregator to call a function on them.
- *
- * This method guarantees that @func will be called only once for each
- * sink pad.
- */
-gboolean
-gst_aggregator_iterate_sinkpads (GstAggregator * self,
-    GstAggregatorPadForeachFunc func, gpointer user_data)
-{
-  gboolean result = FALSE;
-  GstIterator *iter;
-  gboolean done = FALSE;
-  GValue item = { 0, };
-  GList *seen_pads = NULL;
-
-  iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
-
-  if (!iter)
-    goto no_iter;
-
-  while (!done) {
-    switch (gst_iterator_next (iter, &item)) {
-      case GST_ITERATOR_OK:
-      {
-        GstAggregatorPad *pad;
-
-        pad = g_value_get_object (&item);
-
-        /* if already pushed, skip. FIXME, find something faster to tag pads */
-        if (pad == NULL || g_list_find (seen_pads, pad)) {
-          g_value_reset (&item);
-          break;
-        }
-
-        GST_LOG_OBJECT (pad, "calling function %s on pad",
-            GST_DEBUG_FUNCPTR_NAME (func));
-
-        result = func (self, pad, user_data);
-
-        done = !result;
-
-        seen_pads = g_list_prepend (seen_pads, pad);
-
-        g_value_reset (&item);
-        break;
-      }
-      case GST_ITERATOR_RESYNC:
-        gst_iterator_resync (iter);
-        break;
-      case GST_ITERATOR_ERROR:
-        GST_ERROR_OBJECT (self,
-            "Could not iterate over internally linked pads");
-        done = TRUE;
-        break;
-      case GST_ITERATOR_DONE:
-        done = TRUE;
-        break;
-    }
-  }
-  g_value_unset (&item);
-  gst_iterator_free (iter);
-
-  if (seen_pads == NULL) {
-    GST_DEBUG_OBJECT (self, "No pad seen");
-    return FALSE;
-  }
-
-  g_list_free (seen_pads);
-
-no_iter:
-  return result;
-}
-
 static gboolean
 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
 {
-  return (g_queue_peek_tail (&pad->priv->buffers) == NULL &&
+  return (g_queue_peek_tail (&pad->priv->data) == NULL &&
       pad->priv->clipped_buffer == NULL);
 }
 
 static gboolean
 gst_aggregator_check_pads_ready (GstAggregator * self)
 {
-  GstAggregatorPad *pad;
+  GstAggregatorPad *pad = NULL;
   GList *l, *sinkpads;
   gboolean have_buffer = TRUE;
-  gboolean have_event = FALSE;
+  gboolean have_event_or_query = FALSE;
 
   GST_LOG_OBJECT (self, "checking pads");
 
@@ -453,7 +419,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
 
     if (pad->priv->num_buffers == 0) {
       if (!gst_aggregator_pad_queue_is_empty (pad))
-        have_event = TRUE;
+        have_event_or_query = TRUE;
       if (!pad->priv->eos) {
         have_buffer = FALSE;
 
@@ -474,7 +440,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self)
     PAD_UNLOCK (pad);
   }
 
-  if (!have_buffer && !have_event)
+  if (!have_buffer && !have_event_or_query)
     goto pad_not_ready;
 
   if (have_buffer)
@@ -492,13 +458,13 @@ no_sinkpads:
   }
 pad_not_ready:
   {
-    if (have_event)
+    if (have_event_or_query)
       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
           " but waking up for serialized event");
     else
       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
     GST_OBJECT_UNLOCK (self);
-    return have_event;
+    return have_event_or_query;
   }
 }
 
@@ -508,7 +474,8 @@ gst_aggregator_reset_flow_values (GstAggregator * self)
   GST_OBJECT_LOCK (self);
   self->priv->send_stream_start = TRUE;
   self->priv->send_segment = TRUE;
-  gst_segment_init (&self->segment, GST_FORMAT_TIME);
+  gst_segment_init (&GST_AGGREGATOR_PAD (self->srcpad)->segment,
+      GST_FORMAT_TIME);
   self->priv->first_buffer = TRUE;
   GST_OBJECT_UNLOCK (self);
 }
@@ -526,7 +493,8 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
     GST_INFO_OBJECT (self, "pushing stream start");
     /* stream-start (FIXME: create id based on input ids) */
     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
-    if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
+    if (!gst_pad_push_event (GST_PAD (self->srcpad),
+            gst_event_new_stream_start (s_id))) {
       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
     }
     self->priv->send_stream_start = FALSE;
@@ -536,7 +504,7 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
 
     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
         self->priv->srccaps);
-    if (!gst_pad_push_event (self->srcpad,
+    if (!gst_pad_push_event (GST_PAD (self->srcpad),
             gst_event_new_caps (self->priv->srccaps))) {
       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
     }
@@ -546,9 +514,12 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
 
   GST_OBJECT_LOCK (self);
   if (self->priv->send_segment && !self->priv->flush_seeking) {
-    segment = gst_event_new_segment (&self->segment);
+    segment =
+        gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
 
     if (!self->priv->seqnum)
+      /* This code-path is in preparation to be able to run without a source
+       * connected. Then we won't have a seq-num from a segment event. */
       self->priv->seqnum = gst_event_get_seqnum (segment);
     else
       gst_event_set_seqnum (segment, self->priv->seqnum);
@@ -586,17 +557,8 @@ gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
   GST_PAD_STREAM_UNLOCK (self->srcpad);
 }
 
-/**
- * gst_aggregator_finish_buffer:
- * @self: The #GstAggregator
- * @buffer: (transfer full): the #GstBuffer to push.
- *
- * This method will push the provided output buffer downstream. If needed,
- * mandatory events such as stream-start, caps, and segment events will be
- * sent before pushing the buffer.
- */
-GstFlowReturn
-gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
+static GstFlowReturn
+gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
 {
   gst_aggregator_push_mandatory_events (self);
 
@@ -614,6 +576,25 @@ gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
   }
 }
 
+/**
+ * gst_aggregator_finish_buffer:
+ * @aggregator: The #GstAggregator
+ * @buffer: (transfer full): the #GstBuffer to push.
+ *
+ * This method will push the provided output buffer downstream. If needed,
+ * mandatory events such as stream-start, caps, and segment events will be
+ * sent before pushing the buffer.
+ */
+GstFlowReturn
+gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
+{
+  GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
+
+  g_assert (klass->finish_buffer != NULL);
+
+  return klass->finish_buffer (aggregator, buffer);
+}
+
 static void
 gst_aggregator_push_eos (GstAggregator * self)
 {
@@ -745,37 +726,112 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
   return res;
 }
 
+typedef struct
+{
+  gboolean processed_event;
+  GstFlowReturn flow_ret;
+} DoHandleEventsAndQueriesData;
+
 static gboolean
-check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data)
+gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
+    gpointer user_data)
 {
+  GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
+  GstAggregator *aggregator = GST_AGGREGATOR_CAST (self);
   GstEvent *event = NULL;
+  GstQuery *query = NULL;
   GstAggregatorClass *klass = NULL;
-  gboolean *processed_event = user_data;
+  DoHandleEventsAndQueriesData *data = user_data;
 
   do {
     event = NULL;
+    query = NULL;
 
     PAD_LOCK (pad);
-    if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) {
-      pad->priv->pending_eos = FALSE;
-      pad->priv->eos = TRUE;
-    }
     if (pad->priv->clipped_buffer == NULL &&
-        GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) {
-      event = g_queue_pop_tail (&pad->priv->buffers);
-      PAD_BROADCAST_EVENT (pad);
+        !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
+      if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
+        event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
+      if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
+        query = g_queue_peek_tail (&pad->priv->data);
     }
     PAD_UNLOCK (pad);
-    if (event) {
-      if (processed_event)
-        *processed_event = TRUE;
+    if (event || query) {
+      gboolean ret;
+
+      data->processed_event = TRUE;
       if (klass == NULL)
         klass = GST_AGGREGATOR_GET_CLASS (self);
 
-      GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
-      klass->sink_event (self, pad, event);
+      if (event) {
+        GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
+        gst_event_ref (event);
+        ret = klass->sink_event (aggregator, pad, event);
+
+        PAD_LOCK (pad);
+        if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
+          pad->priv->negotiated = ret;
+          if (!ret)
+            pad->priv->flow_return = data->flow_ret = GST_FLOW_NOT_NEGOTIATED;
+        }
+        if (g_queue_peek_tail (&pad->priv->data) == event)
+          gst_event_unref (g_queue_pop_tail (&pad->priv->data));
+        gst_event_unref (event);
+      } else if (query) {
+        GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
+        ret = klass->sink_query (aggregator, pad, query);
+
+        PAD_LOCK (pad);
+        if (g_queue_peek_tail (&pad->priv->data) == query) {
+          GstStructure *s;
+
+          s = gst_query_writable_structure (query);
+          gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
+              NULL);
+          g_queue_pop_tail (&pad->priv->data);
+        }
+      }
+
+      PAD_BROADCAST_EVENT (pad);
+      PAD_UNLOCK (pad);
     }
-  } while (event != NULL);
+  } while (event || query);
+
+  return TRUE;
+}
+
+static gboolean
+gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
+    gpointer user_data)
+{
+  GList *item;
+  GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
+  GstAggregator *agg = (GstAggregator *) self;
+  GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
+
+  if (!klass->skip_buffer)
+    return FALSE;
+
+  PAD_LOCK (aggpad);
+
+  item = g_queue_peek_head_link (&aggpad->priv->data);
+  while (item) {
+    GList *next = item->next;
+
+    if (GST_IS_BUFFER (item->data)
+        && klass->skip_buffer (aggpad, agg, item->data)) {
+      GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
+      gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data));
+      gst_buffer_unref (item->data);
+      g_queue_delete_link (&aggpad->priv->data, item);
+    } else {
+      break;
+    }
+
+    item = next;
+  }
+
+  PAD_UNLOCK (aggpad);
 
   return TRUE;
 }
@@ -792,7 +848,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
   else
     aggpad->priv->flow_return = flow_return;
 
-  item = g_queue_peek_head_link (&aggpad->priv->buffers);
+  item = g_queue_peek_head_link (&aggpad->priv->data);
   while (item) {
     GList *next = item->next;
 
@@ -803,8 +859,9 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
         GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
         GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
         !GST_EVENT_IS_STICKY (item->data)) {
-      gst_mini_object_unref (item->data);
-      g_queue_delete_link (&aggpad->priv->buffers, item);
+      if (!GST_IS_QUERY (item->data))
+        gst_mini_object_unref (item->data);
+      g_queue_delete_link (&aggpad->priv->data, item);
     }
     item = next;
   }
@@ -1056,15 +1113,32 @@ gst_aggregator_aggregate_func (GstAggregator * self)
   GST_LOG_OBJECT (self, "Checking aggregate");
   while (priv->send_eos && priv->running) {
     GstFlowReturn flow_return = GST_FLOW_OK;
-    gboolean processed_event = FALSE;
+    DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK };
+
+    gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+        gst_aggregator_do_events_and_queries, &events_query_data);
 
-    gst_aggregator_iterate_sinkpads (self, check_events, NULL);
+    if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
+      goto handle_error;
 
+    if (self->priv->peer_latency_live)
+      gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+          gst_aggregator_pad_skip_buffers, NULL);
+
+    /* Ensure we have buffers ready (either in clipped_buffer or at the head of
+     * the queue */
     if (!gst_aggregator_wait_and_check (self, &timeout))
       continue;
 
-    gst_aggregator_iterate_sinkpads (self, check_events, &processed_event);
-    if (processed_event)
+    events_query_data.processed_event = FALSE;
+    events_query_data.flow_ret = GST_FLOW_OK;
+    gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+        gst_aggregator_do_events_and_queries, &events_query_data);
+
+    if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
+      goto handle_error;
+
+    if (events_query_data.processed_event)
       continue;
 
     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
@@ -1094,6 +1168,7 @@ gst_aggregator_aggregate_func (GstAggregator * self)
       gst_aggregator_push_eos (self);
     }
 
+  handle_error:
     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
 
     if (flow_return != GST_FLOW_OK) {
@@ -1276,38 +1351,37 @@ gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
 static void
 update_time_level (GstAggregatorPad * aggpad, gboolean head)
 {
+  GstAggregatorPadPrivate *priv = aggpad->priv;
+
   if (head) {
-    if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
-        aggpad->priv->head_segment.format == GST_FORMAT_TIME)
-      aggpad->priv->head_time =
-          gst_segment_to_running_time (&aggpad->priv->head_segment,
-          GST_FORMAT_TIME, aggpad->priv->head_position);
+    if (GST_CLOCK_TIME_IS_VALID (priv->head_position) &&
+        priv->head_segment.format == GST_FORMAT_TIME)
+      priv->head_time = gst_segment_to_running_time (&priv->head_segment,
+          GST_FORMAT_TIME, priv->head_position);
     else
-      aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
+      priv->head_time = GST_CLOCK_TIME_NONE;
 
-    if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time))
-      aggpad->priv->tail_time = aggpad->priv->head_time;
+    if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time))
+      priv->tail_time = priv->head_time;
   } else {
-    if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
+    if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) &&
         aggpad->segment.format == GST_FORMAT_TIME)
-      aggpad->priv->tail_time =
-          gst_segment_to_running_time (&aggpad->segment,
-          GST_FORMAT_TIME, aggpad->priv->tail_position);
+      priv->tail_time = gst_segment_to_running_time (&aggpad->segment,
+          GST_FORMAT_TIME, priv->tail_position);
     else
-      aggpad->priv->tail_time = aggpad->priv->head_time;
+      priv->tail_time = priv->head_time;
   }
 
-  if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE ||
-      aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) {
-    aggpad->priv->time_level = 0;
+  if (priv->head_time == GST_CLOCK_TIME_NONE ||
+      priv->tail_time == GST_CLOCK_TIME_NONE) {
+    priv->time_level = 0;
     return;
   }
 
-  if (aggpad->priv->tail_time > aggpad->priv->head_time)
-    aggpad->priv->time_level = 0;
+  if (priv->tail_time > priv->head_time)
+    priv->time_level = 0;
   else
-    aggpad->priv->time_level = aggpad->priv->head_time -
-        aggpad->priv->tail_time;
+    priv->time_level = priv->head_time - priv->tail_time;
 }
 
 
@@ -1320,6 +1394,8 @@ gst_aggregator_default_sink_event (GstAggregator * self,
   GstPad *pad = GST_PAD (aggpad);
   GstAggregatorPrivate *priv = self->priv;
 
+  GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
+
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
     {
@@ -1330,8 +1406,6 @@ gst_aggregator_default_sink_event (GstAggregator * self,
     }
     case GST_EVENT_FLUSH_STOP:
     {
-      GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
-
       gst_aggregator_pad_flush (aggpad, self);
       GST_OBJECT_LOCK (self);
       if (priv->flush_seeking) {
@@ -1363,21 +1437,11 @@ gst_aggregator_default_sink_event (GstAggregator * self,
     }
     case GST_EVENT_EOS:
     {
-      GST_DEBUG_OBJECT (aggpad, "EOS");
-
-      /* We still have a buffer, and we don't want the subclass to have to
-       * check for it. Mark pending_eos, eos will be set when steal_buffer is
-       * called
-       */
       SRC_LOCK (self);
       PAD_LOCK (aggpad);
-      if (aggpad->priv->num_buffers == 0) {
-        aggpad->priv->eos = TRUE;
-      } else {
-        aggpad->priv->pending_eos = TRUE;
-      }
+      g_assert (aggpad->priv->num_buffers == 0);
+      aggpad->priv->eos = TRUE;
       PAD_UNLOCK (aggpad);
-
       SRC_BROADCAST (self);
       SRC_UNLOCK (self);
       goto eat;
@@ -1387,6 +1451,10 @@ gst_aggregator_default_sink_event (GstAggregator * self,
       PAD_LOCK (aggpad);
       GST_OBJECT_LOCK (aggpad);
       gst_event_copy_segment (event, &aggpad->segment);
+      /* We've got a new segment, tail_position is now meaningless
+       * and may interfere with the time_level calculation
+       */
+      aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
       update_time_level (aggpad, FALSE);
       GST_OBJECT_UNLOCK (aggpad);
       PAD_UNLOCK (aggpad);
@@ -1407,7 +1475,6 @@ gst_aggregator_default_sink_event (GstAggregator * self,
       GstBuffer *gapbuf;
 
       gst_event_parse_gap (event, &pts, &duration);
-      gapbuf = gst_buffer_new ();
 
       if (GST_CLOCK_TIME_IS_VALID (duration))
         endpts = pts + duration;
@@ -1429,11 +1496,18 @@ gst_aggregator_default_sink_event (GstAggregator * self,
       else
         duration = GST_CLOCK_TIME_NONE;
 
+      gapbuf = gst_buffer_new ();
       GST_BUFFER_PTS (gapbuf) = pts;
       GST_BUFFER_DURATION (gapbuf) = duration;
       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
 
+      /* Remove GAP event so we can replace it with the buffer */
+      PAD_LOCK (aggpad);
+      if (g_queue_peek_tail (&aggpad->priv->data) == event)
+        gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
+      PAD_UNLOCK (aggpad);
+
       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
           GST_FLOW_OK) {
         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
@@ -1443,19 +1517,7 @@ gst_aggregator_default_sink_event (GstAggregator * self,
       goto eat;
     }
     case GST_EVENT_TAG:
-    {
-      GstTagList *tags;
-
-      gst_event_parse_tag (event, &tags);
-
-      if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
-        gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
-        gst_event_unref (event);
-        event = NULL;
-        goto eat;
-      }
-      break;
-    }
+      goto eat;
     default:
     {
       break;
@@ -1473,11 +1535,19 @@ eat:
   return res;
 }
 
-static inline gboolean
-gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
-    gpointer unused_udata)
+static gboolean
+gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
 {
-  gst_aggregator_pad_flush (pad, self);
+  GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
+  GstAggregator *agg = GST_AGGREGATOR_CAST (self);
+
+  gst_aggregator_pad_flush (pad, agg);
+
+  PAD_LOCK (pad);
+  pad->priv->flow_return = GST_FLOW_FLUSHING;
+  pad->priv->negotiated = FALSE;
+  PAD_BROADCAST_EVENT (pad);
+  PAD_UNLOCK (pad);
 
   return TRUE;
 }
@@ -1490,7 +1560,9 @@ gst_aggregator_stop (GstAggregator * agg)
 
   gst_aggregator_reset_flow_values (agg);
 
-  gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
+  /* Application needs to make sure no pads are added while it shuts us down */
+  gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg),
+      gst_aggregator_stop_pad, NULL);
 
   klass = GST_AGGREGATOR_GET_CLASS (agg);
 
@@ -1501,7 +1573,7 @@ gst_aggregator_stop (GstAggregator * agg)
 
   agg->priv->has_peer_latency = FALSE;
   agg->priv->peer_latency_live = FALSE;
-  agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE;
+  agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0;
 
   if (agg->priv->tags)
     gst_tag_list_unref (agg->priv->tags);
@@ -1585,25 +1657,39 @@ gst_aggregator_default_create_new_pad (GstAggregator * self,
   GstAggregatorPrivate *priv = self->priv;
   gint serial = 0;
   gchar *name = NULL;
+  GType pad_type =
+      GST_PAD_TEMPLATE_GTYPE (templ) ==
+      G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
 
-  if (templ->direction != GST_PAD_SINK ||
-      g_strcmp0 (templ->name_template, "sink_%u") != 0)
+  if (templ->direction != GST_PAD_SINK)
     goto not_sink;
 
+  if (templ->presence != GST_PAD_REQUEST)
+    goto not_request;
+
   GST_OBJECT_LOCK (self);
   if (req_name == NULL || strlen (req_name) < 6
-      || !g_str_has_prefix (req_name, "sink_")) {
+      || !g_str_has_prefix (req_name, "sink_")
+      || strrchr (req_name, '%') != NULL) {
     /* no name given when requesting the pad, use next available int */
     serial = ++priv->max_padserial;
   } else {
+    gchar *endptr = NULL;
+
     /* parse serial number from requested padname */
-    serial = g_ascii_strtoull (&req_name[5], NULL, 10);
-    if (serial > priv->max_padserial)
-      priv->max_padserial = serial;
+    serial = g_ascii_strtoull (&req_name[5], &endptr, 10);
+    if (endptr != NULL && *endptr == '\0') {
+      if (serial > priv->max_padserial) {
+        priv->max_padserial = serial;
+      }
+    } else {
+      serial = ++priv->max_padserial;
+    }
   }
 
   name = g_strdup_printf ("sink_%u", serial);
-  agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
+  g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
+  agg_pad = g_object_new (pad_type,
       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
   g_free (name);
 
@@ -1614,7 +1700,12 @@ gst_aggregator_default_create_new_pad (GstAggregator * self,
   /* errors */
 not_sink:
   {
-    GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad\n");
+    GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad");
+    return NULL;
+  }
+not_request:
+  {
+    GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad");
     return NULL;
   }
 }
@@ -1664,14 +1755,22 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
 
   gst_query_parse_latency (query, &live, &min, &max);
 
-  our_latency = self->priv->latency;
-
   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
     return FALSE;
   }
 
+  if (self->priv->upstream_latency_min > min) {
+    GstClockTimeDiff diff =
+        GST_CLOCK_DIFF (min, self->priv->upstream_latency_min);
+
+    min += diff;
+    if (GST_CLOCK_TIME_IS_VALID (max)) {
+      max += diff;
+    }
+  }
+
   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
@@ -1680,6 +1779,8 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
     return FALSE;
   }
 
+  our_latency = self->priv->latency;
+
   self->priv->peer_latency_live = live;
   self->priv->peer_latency_min = min;
   self->priv->peer_latency_max = max;
@@ -1781,8 +1882,8 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event)
         &start, &stop_type, &stop);
 
     GST_OBJECT_LOCK (self);
-    gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
-        stop_type, stop, NULL);
+    gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
+        flags, start_type, start, stop_type, stop, NULL);
     self->priv->seqnum = gst_event_get_seqnum (event);
     self->priv->first_buffer = FALSE;
     GST_OBJECT_UNLOCK (self);
@@ -1841,7 +1942,6 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
     } else {
       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
-      gst_object_unref (peer);
     }
   }
 
@@ -1881,26 +1981,24 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
 
   evdata->result &= ret;
 
+  if (peer)
+    gst_object_unref (peer);
+
   /* Always send to all pads */
   return FALSE;
 }
 
-static EventData
+static void
 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
-    GstEvent * event, gboolean flush, gboolean only_to_active_pads)
+    EventData * evdata)
 {
-  EventData evdata;
-
-  evdata.event = event;
-  evdata.result = TRUE;
-  evdata.flush = flush;
-  evdata.one_actually_seeked = FALSE;
-  evdata.only_to_active_pads = only_to_active_pads;
+  evdata->result = TRUE;
+  evdata->one_actually_seeked = FALSE;
 
   /* We first need to set all pads as flushing in a first pass
    * as flush_start flush_stop is sometimes sent synchronously
    * while we send the seek event */
-  if (flush) {
+  if (evdata->flush) {
     GList *l;
 
     GST_OBJECT_LOCK (self);
@@ -1915,11 +2013,9 @@ gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
     GST_OBJECT_UNLOCK (self);
   }
 
-  gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, &evdata);
-
-  gst_event_unref (event);
+  gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
 
-  return evdata;
+  gst_event_unref (evdata->event);
 }
 
 static gboolean
@@ -1931,7 +2027,7 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
   GstSeekType start_type, stop_type;
   gint64 start, stop;
   gboolean flush;
-  EventData evdata;
+  EventData evdata = { 0, };
   GstAggregatorPrivate *priv = self->priv;
 
   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
@@ -1947,16 +2043,18 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
     priv->flush_seeking = TRUE;
   }
 
-  gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
-      stop_type, stop, NULL);
+  gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
+      flags, start_type, start, stop_type, stop, NULL);
 
   /* Seeking sets a position */
   self->priv->first_buffer = FALSE;
   GST_OBJECT_UNLOCK (self);
 
   /* forward the seek upstream */
-  evdata =
-      gst_aggregator_forward_event_to_all_sinkpads (self, event, flush, FALSE);
+  evdata.event = event;
+  evdata.flush = flush;
+  evdata.only_to_active_pads = FALSE;
+  gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
   event = NULL;
 
   if (!evdata.result || !evdata.one_actually_seeked) {
@@ -1974,41 +2072,28 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
 static gboolean
 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
 {
-  EventData evdata;
-  gboolean res = TRUE;
+  EventData evdata = { 0, };
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_SEEK:
-    {
-      gst_event_ref (event);
-      res = gst_aggregator_do_seek (self, event);
-      gst_event_unref (event);
-      event = NULL;
-      goto done;
-    }
+      /* _do_seek() unrefs the event. */
+      return gst_aggregator_do_seek (self, event);
     case GST_EVENT_NAVIGATION:
-    {
       /* navigation is rather pointless. */
-      res = FALSE;
       gst_event_unref (event);
-      goto done;
-    }
+      return FALSE;
     default:
-    {
       break;
-    }
   }
 
   /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
    * they will receive a QOS event that has earliest_time=0 (because we can't
    * have negative timestamps), and consider their buffer as too late */
-  evdata =
-      gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE,
-      GST_EVENT_TYPE (event) == GST_EVENT_QOS);
-  res = evdata.result;
-
-done:
-  return res;
+  evdata.event = event;
+  evdata.flush = FALSE;
+  evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
+  gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
+  return evdata.result;
 }
 
 static gboolean
@@ -2071,6 +2156,45 @@ gst_aggregator_default_sink_query (GstAggregator * self,
 {
   GstPad *pad = GST_PAD (aggpad);
 
+  if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) {
+    GstQuery *decide_query = NULL;
+    GstAggregatorClass *agg_class;
+    gboolean ret;
+
+    GST_OBJECT_LOCK (self);
+    PAD_LOCK (aggpad);
+    if (G_UNLIKELY (!aggpad->priv->negotiated)) {
+      GST_DEBUG_OBJECT (self,
+          "not negotiated yet, can't answer ALLOCATION query");
+      PAD_UNLOCK (aggpad);
+      GST_OBJECT_UNLOCK (self);
+
+      return FALSE;
+    }
+
+    if ((decide_query = self->priv->allocation_query))
+      gst_query_ref (decide_query);
+    PAD_UNLOCK (aggpad);
+    GST_OBJECT_UNLOCK (self);
+
+    GST_DEBUG_OBJECT (self,
+        "calling propose allocation with query %" GST_PTR_FORMAT, decide_query);
+
+    agg_class = GST_AGGREGATOR_GET_CLASS (self);
+
+    /* pass the query to the propose_allocation vmethod if any */
+    if (agg_class->propose_allocation)
+      ret = agg_class->propose_allocation (self, aggpad, decide_query, query);
+    else
+      ret = FALSE;
+
+    if (decide_query)
+      gst_query_unref (decide_query);
+
+    GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query);
+    return ret;
+  }
+
   return gst_pad_query_default (pad, GST_OBJECT (self), query);
 }
 
@@ -2095,7 +2219,7 @@ gst_aggregator_finalize (GObject * object)
  * as unresponsive.
  */
 static void
-gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
+gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency)
 {
   gboolean changed;
 
@@ -2146,12 +2270,12 @@ gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
  * before a pad is deemed unresponsive. A value of -1 means an
  * unlimited time.
  */
-static gint64
+static GstClockTime
 gst_aggregator_get_latency_property (GstAggregator * agg)
 {
-  gint64 res;
+  GstClockTime res;
 
-  g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
+  g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE);
 
   GST_OBJECT_LOCK (agg);
   res = agg->priv->latency;
@@ -2168,7 +2292,12 @@ gst_aggregator_set_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_LATENCY:
-      gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
+      gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
+      break;
+    case PROP_MIN_UPSTREAM_LATENCY:
+      SRC_LOCK (agg);
+      agg->priv->upstream_latency_min = g_value_get_uint64 (value);
+      SRC_UNLOCK (agg);
       break;
     case PROP_START_TIME_SELECTION:
       agg->priv->start_time_selection = g_value_get_enum (value);
@@ -2190,7 +2319,12 @@ gst_aggregator_get_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_LATENCY:
-      g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
+      g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
+      break;
+    case PROP_MIN_UPSTREAM_LATENCY:
+      SRC_LOCK (agg);
+      g_value_set_uint64 (value, agg->priv->upstream_latency_min);
+      SRC_UNLOCK (agg);
       break;
     case PROP_START_TIME_SELECTION:
       g_value_set_enum (value, agg->priv->start_time_selection);
@@ -2212,12 +2346,14 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
   GstElementClass *gstelement_class = (GstElementClass *) klass;
 
   aggregator_parent_class = g_type_class_peek_parent (klass);
-  g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
 
   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
       GST_DEBUG_FG_MAGENTA, "GstAggregator");
 
-  klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
+  if (aggregator_private_offset != 0)
+    g_type_class_adjust_private_offset (klass, &aggregator_private_offset);
+
+  klass->finish_buffer = gst_aggregator_default_finish_buffer;
 
   klass->sink_event = gst_aggregator_default_sink_event;
   klass->sink_query = gst_aggregator_default_sink_query;
@@ -2243,11 +2379,31 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
   gobject_class->finalize = gst_aggregator_finalize;
 
   g_object_class_install_property (gobject_class, PROP_LATENCY,
-      g_param_spec_int64 ("latency", "Buffer latency",
+      g_param_spec_uint64 ("latency", "Buffer latency",
           "Additional latency in live mode to allow upstream "
           "to take longer to produce buffers for the current "
-          "position (in nanoseconds)", 0,
-          (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
+          "position (in nanoseconds)", 0, G_MAXUINT64,
+          DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
+   * GstAggregator:min-upstream-latency:
+   *
+   * Force minimum upstream latency (in nanoseconds). When sources with a
+   * higher latency are expected to be plugged in dynamically after the
+   * aggregator has started playing, this allows overriding the minimum
+   * latency reported by the initial source(s). This is only taken into
+   * account when larger than the actually reported minimum latency.
+   *
+   * Since: 1.16
+   */
+  g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY,
+      g_param_spec_uint64 ("min-upstream-latency", "Buffer latency",
+          "When sources with a higher latency are expected to be plugged "
+          "in dynamically after the aggregator has started playing, "
+          "this allows overriding the minimum latency reported by the "
+          "initial source(s). This is only taken into account when larger "
+          "than the actually reported minimum latency. (nanoseconds)",
+          0, G_MAXUINT64,
           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
@@ -2262,8 +2418,12 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
           "Start time to use if start-time-selection=set", 0,
           G_MAXUINT64,
           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+}
 
-  GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
+static inline gpointer
+gst_aggregator_get_instance_private (GstAggregator * self)
+{
+  return (G_STRUCT_MEMBER_P (self, aggregator_private_offset));
 }
 
 static void
@@ -2271,12 +2431,11 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
 {
   GstPadTemplate *pad_template;
   GstAggregatorPrivate *priv;
+  GType pad_type;
 
   g_return_if_fail (klass->aggregate != NULL);
 
-  self->priv =
-      G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
-      GstAggregatorPrivate);
+  self->priv = gst_aggregator_get_instance_private (self);
 
   priv = self->priv;
 
@@ -2291,9 +2450,17 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
   self->priv->has_peer_latency = FALSE;
-  gst_aggregator_reset_flow_values (self);
 
-  self->srcpad = gst_pad_new_from_template (pad_template, "src");
+  pad_type =
+      GST_PAD_TEMPLATE_GTYPE (pad_template) ==
+      G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD :
+      GST_PAD_TEMPLATE_GTYPE (pad_template);
+  g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
+  self->srcpad =
+      g_object_new (pad_type, "name", "src", "direction", GST_PAD_SRC,
+      "template", pad_template, NULL);
+
+  gst_aggregator_reset_flow_values (self);
 
   gst_pad_set_event_function (self->srcpad,
       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
@@ -2304,6 +2471,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
 
   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
 
+  self->priv->upstream_latency_min = DEFAULT_MIN_UPSTREAM_LATENCY;
   self->priv->latency = DEFAULT_LATENCY;
   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
   self->priv->start_time = DEFAULT_START_TIME;
@@ -2335,6 +2503,10 @@ gst_aggregator_get_type (void)
 
     _type = g_type_register_static (GST_TYPE_ELEMENT,
         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
+
+    aggregator_private_offset =
+        g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate));
+
     g_once_init_leave (&type, _type);
   }
   return type;
@@ -2391,6 +2563,12 @@ apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
   update_time_level (aggpad, head);
 }
 
+/*
+ * Can be called either from the sinkpad's chain function or from the srcpad's
+ * thread in the case of a buffer synthetized from a GAP event.
+ * Because of this second case, FLUSH_LOCK can't be used here.
+ */
+
 static GstFlowReturn
 gst_aggregator_pad_chain_internal (GstAggregator * self,
     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
@@ -2398,18 +2576,11 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
   GstFlowReturn flow_return;
   GstClockTime buf_pts;
 
-  GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
-
-  PAD_FLUSH_LOCK (aggpad);
-
   PAD_LOCK (aggpad);
   flow_return = aggpad->priv->flow_return;
   if (flow_return != GST_FLOW_OK)
     goto flushing;
 
-  if (aggpad->priv->pending_eos == TRUE)
-    goto eos;
-
   PAD_UNLOCK (aggpad);
 
   buf_pts = GST_BUFFER_PTS (buffer);
@@ -2424,12 +2595,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
       aggpad->priv->first_buffer = FALSE;
     }
 
-    if (gst_aggregator_pad_has_space (self, aggpad)
+    if ((gst_aggregator_pad_has_space (self, aggpad) || !head)
         && aggpad->priv->flow_return == GST_FLOW_OK) {
       if (head)
-        g_queue_push_head (&aggpad->priv->buffers, buffer);
+        g_queue_push_head (&aggpad->priv->data, buffer);
       else
-        g_queue_push_tail (&aggpad->priv->buffers, buffer);
+        g_queue_push_tail (&aggpad->priv->data, buffer);
       apply_buffer (aggpad, buffer, head);
       aggpad->priv->num_buffers++;
       buffer = NULL;
@@ -2453,6 +2624,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
 
   if (self->priv->first_buffer) {
     GstClockTime start_time;
+    GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
 
     switch (self->priv->start_time_selection) {
       case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
@@ -2486,10 +2658,10 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
     }
 
     if (start_time != -1) {
-      if (self->segment.position == -1)
-        self->segment.position = start_time;
+      if (srcpad->segment.position == -1)
+        srcpad->segment.position = start_time;
       else
-        self->segment.position = MIN (start_time, self->segment.position);
+        srcpad->segment.position = MIN (start_time, srcpad->segment.position);
 
       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
           GST_TIME_ARGS (start_time));
@@ -2500,15 +2672,12 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
   GST_OBJECT_UNLOCK (self);
   SRC_UNLOCK (self);
 
-  PAD_FLUSH_UNLOCK (aggpad);
-
   GST_DEBUG_OBJECT (aggpad, "Done chaining");
 
   return flow_return;
 
 flushing:
   PAD_UNLOCK (aggpad);
-  PAD_FLUSH_UNLOCK (aggpad);
 
   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
       gst_flow_get_name (flow_return));
@@ -2516,56 +2685,83 @@ flushing:
     gst_buffer_unref (buffer);
 
   return flow_return;
-
-eos:
-  PAD_UNLOCK (aggpad);
-  PAD_FLUSH_UNLOCK (aggpad);
-
-  gst_buffer_unref (buffer);
-  GST_DEBUG_OBJECT (aggpad, "We are EOS already...");
-
-  return GST_FLOW_EOS;
 }
 
 static GstFlowReturn
 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
 {
-  return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
-      GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE);
+  GstFlowReturn ret;
+  GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
+
+  PAD_FLUSH_LOCK (aggpad);
+
+  ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
+      aggpad, buffer, TRUE);
+
+  PAD_FLUSH_UNLOCK (aggpad);
+
+  return ret;
 }
 
 static gboolean
 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
     GstQuery * query)
 {
+  GstAggregator *self = GST_AGGREGATOR (parent);
   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
-  GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
 
   if (GST_QUERY_IS_SERIALIZED (query)) {
+    GstStructure *s;
+    gboolean ret = FALSE;
+
+    SRC_LOCK (self);
     PAD_LOCK (aggpad);
 
+    if (aggpad->priv->flow_return != GST_FLOW_OK) {
+      SRC_UNLOCK (self);
+      goto flushing;
+    }
+
+    g_queue_push_head (&aggpad->priv->data, query);
+    SRC_BROADCAST (self);
+    SRC_UNLOCK (self);
+
     while (!gst_aggregator_pad_queue_is_empty (aggpad)
         && aggpad->priv->flow_return == GST_FLOW_OK) {
       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
       PAD_WAIT_EVENT (aggpad);
     }
 
+    s = gst_query_writable_structure (query);
+    if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
+      gst_structure_remove_field (s, "gst-aggregator-retval");
+    else
+      g_queue_remove (&aggpad->priv->data, query);
+
     if (aggpad->priv->flow_return != GST_FLOW_OK)
       goto flushing;
 
     PAD_UNLOCK (aggpad);
-  }
 
-  return klass->sink_query (GST_AGGREGATOR (parent),
-      GST_AGGREGATOR_PAD (pad), query);
+    return ret;
+  } else {
+    GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
+
+    return klass->sink_query (self, aggpad, query);
+  }
 
 flushing:
   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
       gst_flow_get_name (aggpad->priv->flow_return));
   PAD_UNLOCK (aggpad);
+
   return FALSE;
 }
 
+/* Queue serialized events and let the others go through directly.
+ * The queued events with be handled from the src-pad task in
+ * gst_aggregator_do_events_and_queries().
+ */
 static GstFlowReturn
 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
     GstEvent * event)
@@ -2573,18 +2769,14 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
   GstFlowReturn ret = GST_FLOW_OK;
   GstAggregator *self = GST_AGGREGATOR (parent);
   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
-  GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
 
-  if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
-      /* && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE */ ) {
+  if (GST_EVENT_IS_SERIALIZED (event)
+      && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
     SRC_LOCK (self);
     PAD_LOCK (aggpad);
 
-    if (aggpad->priv->flow_return != GST_FLOW_OK
-        && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
-      ret = aggpad->priv->flow_return;
+    if (aggpad->priv->flow_return != GST_FLOW_OK)
       goto flushing;
-    }
 
     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
       GST_OBJECT_LOCK (aggpad);
@@ -2594,18 +2786,14 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
       GST_OBJECT_UNLOCK (aggpad);
     }
 
-    if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
-      GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT,
-          event);
-      g_queue_push_head (&aggpad->priv->buffers, event);
-      event = NULL;
-      SRC_BROADCAST (self);
-    }
+    GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event);
+    g_queue_push_head (&aggpad->priv->data, event);
+    SRC_BROADCAST (self);
     PAD_UNLOCK (aggpad);
     SRC_UNLOCK (self);
-  }
+  } else {
+    GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
 
-  if (event) {
     if (!klass->sink_event (self, aggpad, event)) {
       /* Copied from GstPad to convert boolean to a GstFlowReturn in
        * the event handling func */
@@ -2624,7 +2812,7 @@ flushing:
     gst_pad_store_sticky_event (pad, event);
   gst_event_unref (event);
 
-  return ret;
+  return aggpad->priv->flow_return;
 }
 
 static gboolean
@@ -2652,21 +2840,39 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad,
 /***********************************
  * GstAggregatorPad implementation  *
  ************************************/
-G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
+G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
+
+#define DEFAULT_PAD_EMIT_SIGNALS FALSE
+
+enum
+{
+  PAD_PROP_0,
+  PAD_PROP_EMIT_SIGNALS,
+};
+
+enum
+{
+  PAD_SIGNAL_BUFFER_CONSUMED,
+  PAD_LAST_SIGNAL,
+};
+
+static guint gst_aggregator_pad_signals[PAD_LAST_SIGNAL] = { 0 };
 
 static void
 gst_aggregator_pad_constructed (GObject * object)
 {
   GstPad *pad = GST_PAD (object);
 
-  gst_pad_set_chain_function (pad,
-      GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
-  gst_pad_set_event_full_function_full (pad,
-      GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
-  gst_pad_set_query_function (pad,
-      GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
-  gst_pad_set_activatemode_function (pad,
-      GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
+  if (GST_PAD_IS_SINK (pad)) {
+    gst_pad_set_chain_function (pad,
+        GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
+    gst_pad_set_event_full_function_full (pad,
+        GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
+    gst_pad_set_query_function (pad,
+        GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
+    gst_pad_set_activatemode_function (pad,
+        GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
+  }
 }
 
 static void
@@ -2692,42 +2898,101 @@ gst_aggregator_pad_dispose (GObject * object)
 }
 
 static void
+gst_aggregator_pad_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
+
+  switch (prop_id) {
+    case PAD_PROP_EMIT_SIGNALS:
+      pad->priv->emit_signals = g_value_get_boolean (value);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_aggregator_pad_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
+
+  switch (prop_id) {
+    case PAD_PROP_EMIT_SIGNALS:
+      g_value_set_boolean (value, pad->priv->emit_signals);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
 {
   GObjectClass *gobject_class = (GObjectClass *) klass;
 
-  g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
-
   gobject_class->constructed = gst_aggregator_pad_constructed;
   gobject_class->finalize = gst_aggregator_pad_finalize;
   gobject_class->dispose = gst_aggregator_pad_dispose;
+  gobject_class->set_property = gst_aggregator_pad_set_property;
+  gobject_class->get_property = gst_aggregator_pad_get_property;
+
+  /**
+   * GstAggregatorPad:buffer-consumed:
+   *
+   * Signals that a buffer was consumed. As aggregator pads store buffers
+   * in an internal queue, there is no direct match between input and output
+   * buffers at any given time. This signal can be useful to forward metas
+   * such as #GstVideoTimeCodeMeta or #GstVideoCaptionMeta at the right time.
+   *
+   * Since: 1.16
+   */
+  gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED] =
+      g_signal_new ("buffer-consumed", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_FIRST, 0, NULL, NULL, g_cclosure_marshal_generic,
+      G_TYPE_NONE, 1, GST_TYPE_BUFFER);
+
+  /**
+   * GstAggregatorPad:emit-signals:
+   *
+   * Enables the emission of signals such as #GstAggregatorPad::buffer-consumed
+   *
+   * Since: 1.16
+   */
+  g_object_class_install_property (gobject_class, PAD_PROP_EMIT_SIGNALS,
+      g_param_spec_boolean ("emit-signals", "Emit signals",
+          "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 }
 
 static void
 gst_aggregator_pad_init (GstAggregatorPad * pad)
 {
-  pad->priv =
-      G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
-      GstAggregatorPadPrivate);
+  pad->priv = gst_aggregator_pad_get_instance_private (pad);
 
-  g_queue_init (&pad->priv->buffers);
+  g_queue_init (&pad->priv->data);
   g_cond_init (&pad->priv->event_cond);
 
   g_mutex_init (&pad->priv->flush_lock);
   g_mutex_init (&pad->priv->lock);
 
   gst_aggregator_pad_reset_unlocked (pad);
+  pad->priv->negotiated = FALSE;
+  pad->priv->emit_signals = DEFAULT_PAD_EMIT_SIGNALS;
 }
 
 /* Must be called with the PAD_LOCK held */
 static void
-gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
+gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer)
 {
   pad->priv->num_buffers--;
-  GST_TRACE_OBJECT (pad, "Consuming buffer");
-  if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
-    pad->priv->pending_eos = FALSE;
-    pad->priv->eos = TRUE;
+  GST_TRACE_OBJECT (pad, "Consuming buffer %" GST_PTR_FORMAT, buffer);
+  if (buffer && pad->priv->emit_signals) {
+    g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
+        0, buffer);
   }
   PAD_BROADCAST_EVENT (pad);
 }
@@ -2737,12 +3002,12 @@ static void
 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
 {
   GstAggregator *self = NULL;
-  GstAggregatorClass *aggclass;
+  GstAggregatorClass *aggclass = NULL;
   GstBuffer *buffer = NULL;
 
   while (pad->priv->clipped_buffer == NULL &&
-      GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) {
-    buffer = g_queue_pop_tail (&pad->priv->buffers);
+      GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
+    buffer = g_queue_pop_tail (&pad->priv->data);
 
     apply_buffer (pad, buffer, FALSE);
 
@@ -2765,7 +3030,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
       buffer = aggclass->clip (self, pad, buffer);
 
       if (buffer == NULL) {
-        gst_aggregator_pad_buffer_consumed (pad);
+        gst_aggregator_pad_buffer_consumed (pad, buffer);
         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
       }
     }
@@ -2778,7 +3043,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
 }
 
 /**
- * gst_aggregator_pad_steal_buffer:
+ * gst_aggregator_pad_pop_buffer:
  * @pad: the pad to get buffer from
  *
  * Steal the ref to the buffer currently queued in @pad.
@@ -2787,19 +3052,24 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
  *   queued. You should unref the buffer after usage.
  */
 GstBuffer *
-gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
 {
   GstBuffer *buffer;
 
   PAD_LOCK (pad);
 
+  if (pad->priv->flow_return != GST_FLOW_OK) {
+    PAD_UNLOCK (pad);
+    return NULL;
+  }
+
   gst_aggregator_pad_clip_buffer_unlocked (pad);
 
   buffer = pad->priv->clipped_buffer;
-  pad->priv->clipped_buffer = NULL;
 
   if (buffer) {
-    gst_aggregator_pad_buffer_consumed (pad);
+    pad->priv->clipped_buffer = NULL;
+    gst_aggregator_pad_buffer_consumed (pad, buffer);
     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
   }
 
@@ -2821,7 +3091,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
 {
   GstBuffer *buf;
 
-  buf = gst_aggregator_pad_steal_buffer (pad);
+  buf = gst_aggregator_pad_pop_buffer (pad);
 
   if (buf == NULL)
     return FALSE;
@@ -2831,7 +3101,7 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
 }
 
 /**
- * gst_aggregator_pad_get_buffer:
+ * gst_aggregator_pad_peek_buffer:
  * @pad: the pad to get buffer from
  *
  * Returns: (transfer full): A reference to the buffer in @pad or
@@ -2839,12 +3109,17 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
  * usage.
  */
 GstBuffer *
-gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
 {
   GstBuffer *buffer;
 
   PAD_LOCK (pad);
 
+  if (pad->priv->flow_return != GST_FLOW_OK) {
+    PAD_UNLOCK (pad);
+    return NULL;
+  }
+
   gst_aggregator_pad_clip_buffer_unlocked (pad);
 
   if (pad->priv->clipped_buffer) {
@@ -2857,6 +3132,37 @@ gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
   return buffer;
 }
 
+/**
+ * gst_aggregator_pad_has_buffer:
+ * @pad: the pad to check the buffer on
+ *
+ * This checks if a pad has a buffer available that will be returned by
+ * a call to gst_aggregator_pad_peek_buffer() or
+ * gst_aggregator_pad_pop_buffer().
+ *
+ * Returns: %TRUE if the pad has a buffer available as the next thing.
+ *
+ * Since: 1.14.1
+ */
+gboolean
+gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
+{
+  gboolean has_buffer;
+
+  PAD_LOCK (pad);
+  gst_aggregator_pad_clip_buffer_unlocked (pad);
+  has_buffer = (pad->priv->clipped_buffer != NULL);
+  PAD_UNLOCK (pad);
+
+  return has_buffer;
+}
+
+/**
+ * gst_aggregator_pad_is_eos:
+ * @pad: an aggregator pad
+ *
+ * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
+ */
 gboolean
 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
 {
@@ -2869,7 +3175,8 @@ gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
   return is_eos;
 }
 
-/**
+#if 0
+/*
  * gst_aggregator_merge_tags:
  * @self: a #GstAggregator
  * @tags: a #GstTagList to merge
@@ -2903,6 +3210,7 @@ gst_aggregator_merge_tags (GstAggregator * self,
   self->priv->tags_changed = TRUE;
   GST_OBJECT_UNLOCK (self);
 }
+#endif
 
 /**
  * gst_aggregator_set_latency:
@@ -2993,3 +3301,40 @@ gst_aggregator_get_allocator (GstAggregator * self,
   if (params)
     *params = self->priv->allocation_params;
 }
+
+/**
+ * gst_aggregator_simple_get_next_time:
+ * @self: A #GstAggregator
+ *
+ * This is a simple #GstAggregator::get_next_time implementation that
+ * just looks at the #GstSegment on the srcpad of the aggregator and bases
+ * the next time on the running time there.
+ *
+ * This is the desired behaviour in most cases where you have a live source
+ * and you have a dead line based aggregator subclass.
+ *
+ * Returns: The running time based on the position
+ *
+ * Since: 1.16
+ */
+GstClockTime
+gst_aggregator_simple_get_next_time (GstAggregator * self)
+{
+  GstClockTime next_time;
+  GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
+  GstSegment *segment = &srcpad->segment;
+
+  GST_OBJECT_LOCK (self);
+  if (segment->position == -1 || segment->position < segment->start)
+    next_time = segment->start;
+  else
+    next_time = segment->position;
+
+  if (segment->stop != -1 && next_time > segment->stop)
+    next_time = segment->stop;
+
+  next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
+  GST_OBJECT_UNLOCK (self);
+
+  return next_time;
+}