aggregator: Assert if the sink/src pad type that is to be used is not a GstAggregator...
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
index 89778a3..94d4816 100644 (file)
  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
  *    to ease their identification and subsequent processing.
  *
+ *  * Subclasses must use (a subclass of) #GstAggregatorPad for both their
+ *    sink and source pads.
+ *    See gst_element_class_add_static_pad_template_with_gtype().
+ *
  * This class used to live in gst-plugins-bad and was moved to core.
  *
  * Since: 1.14
@@ -126,7 +130,8 @@ static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
 
 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
 
-static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
+static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad,
+    GstBuffer * buffer);
 
 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
 #define GST_CAT_DEFAULT aggregator_debug
@@ -257,6 +262,9 @@ struct _GstAggregatorPadPrivate
    * the chain function is also happening.
    */
   GMutex flush_lock;
+
+  /* properties */
+  gboolean emit_signals;
 };
 
 /* Must be called with PAD_LOCK held */
@@ -287,7 +295,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
   PAD_UNLOCK (aggpad);
 
   if (klass->flush)
-    return klass->flush (aggpad, agg);
+    return (klass->flush (aggpad, agg) == GST_FLOW_OK);
 
   return TRUE;
 }
@@ -296,6 +304,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
  * GstAggregator implementation  *
  *************************************/
 static GstElementClass *aggregator_parent_class = NULL;
+static gint aggregator_private_offset = 0;
 
 /* All members are protected by the object lock unless otherwise noted */
 
@@ -328,6 +337,8 @@ struct _GstAggregatorPrivate
   GstClockTime sub_latency_min; /* protected by src_lock */
   GstClockTime sub_latency_max; /* protected by src_lock */
 
+  GstClockTime upstream_latency_min;    /* protected by src_lock */
+
   /* aggregate */
   GstClockID aggregate_id;      /* protected by src_lock */
   GMutex src_lock;
@@ -361,6 +372,7 @@ typedef struct
 } EventData;
 
 #define DEFAULT_LATENCY              0
+#define DEFAULT_MIN_UPSTREAM_LATENCY              0
 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
 #define DEFAULT_START_TIME           (-1)
 
@@ -368,6 +380,7 @@ enum
 {
   PROP_0,
   PROP_LATENCY,
+  PROP_MIN_UPSTREAM_LATENCY,
   PROP_START_TIME_SELECTION,
   PROP_START_TIME,
   PROP_LAST
@@ -461,7 +474,8 @@ gst_aggregator_reset_flow_values (GstAggregator * self)
   GST_OBJECT_LOCK (self);
   self->priv->send_stream_start = TRUE;
   self->priv->send_segment = TRUE;
-  gst_segment_init (&self->segment, GST_FORMAT_TIME);
+  gst_segment_init (&GST_AGGREGATOR_PAD (self->srcpad)->segment,
+      GST_FORMAT_TIME);
   self->priv->first_buffer = TRUE;
   GST_OBJECT_UNLOCK (self);
 }
@@ -479,7 +493,8 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
     GST_INFO_OBJECT (self, "pushing stream start");
     /* stream-start (FIXME: create id based on input ids) */
     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
-    if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
+    if (!gst_pad_push_event (GST_PAD (self->srcpad),
+            gst_event_new_stream_start (s_id))) {
       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
     }
     self->priv->send_stream_start = FALSE;
@@ -489,7 +504,7 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
 
     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
         self->priv->srccaps);
-    if (!gst_pad_push_event (self->srcpad,
+    if (!gst_pad_push_event (GST_PAD (self->srcpad),
             gst_event_new_caps (self->priv->srccaps))) {
       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
     }
@@ -499,7 +514,8 @@ gst_aggregator_push_mandatory_events (GstAggregator * self)
 
   GST_OBJECT_LOCK (self);
   if (self->priv->send_segment && !self->priv->flush_seeking) {
-    segment = gst_event_new_segment (&self->segment);
+    segment =
+        gst_event_new_segment (&GST_AGGREGATOR_PAD (self->srcpad)->segment);
 
     if (!self->priv->seqnum)
       /* This code-path is in preparation to be able to run without a source
@@ -710,6 +726,12 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
   return res;
 }
 
+typedef struct
+{
+  gboolean processed_event;
+  GstFlowReturn flow_ret;
+} DoHandleEventsAndQueriesData;
+
 static gboolean
 gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
     gpointer user_data)
@@ -719,7 +741,7 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
   GstEvent *event = NULL;
   GstQuery *query = NULL;
   GstAggregatorClass *klass = NULL;
-  gboolean *processed_event = user_data;
+  DoHandleEventsAndQueriesData *data = user_data;
 
   do {
     event = NULL;
@@ -737,8 +759,7 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
     if (event || query) {
       gboolean ret;
 
-      if (processed_event)
-        *processed_event = TRUE;
+      data->processed_event = TRUE;
       if (klass == NULL)
         klass = GST_AGGREGATOR_GET_CLASS (self);
 
@@ -748,8 +769,11 @@ gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
         ret = klass->sink_event (aggregator, pad, event);
 
         PAD_LOCK (pad);
-        if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
+        if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
           pad->priv->negotiated = ret;
+          if (!ret)
+            pad->priv->flow_return = data->flow_ret = GST_FLOW_NOT_NEGOTIATED;
+        }
         if (g_queue_peek_tail (&pad->priv->data) == event)
           gst_event_unref (g_queue_pop_tail (&pad->priv->data));
         gst_event_unref (event);
@@ -797,7 +821,7 @@ gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
     if (GST_IS_BUFFER (item->data)
         && klass->skip_buffer (aggpad, agg, item->data)) {
       GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
-      gst_aggregator_pad_buffer_consumed (aggpad);
+      gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data));
       gst_buffer_unref (item->data);
       g_queue_delete_link (&aggpad->priv->data, item);
     } else {
@@ -1089,10 +1113,13 @@ gst_aggregator_aggregate_func (GstAggregator * self)
   GST_LOG_OBJECT (self, "Checking aggregate");
   while (priv->send_eos && priv->running) {
     GstFlowReturn flow_return = GST_FLOW_OK;
-    gboolean processed_event = FALSE;
+    DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK };
 
     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
-        gst_aggregator_do_events_and_queries, NULL);
+        gst_aggregator_do_events_and_queries, &events_query_data);
+
+    if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
+      goto handle_error;
 
     if (self->priv->peer_latency_live)
       gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
@@ -1103,10 +1130,15 @@ gst_aggregator_aggregate_func (GstAggregator * self)
     if (!gst_aggregator_wait_and_check (self, &timeout))
       continue;
 
+    events_query_data.processed_event = FALSE;
+    events_query_data.flow_ret = GST_FLOW_OK;
     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
-        gst_aggregator_do_events_and_queries, &processed_event);
+        gst_aggregator_do_events_and_queries, &events_query_data);
+
+    if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
+      goto handle_error;
 
-    if (processed_event)
+    if (events_query_data.processed_event)
       continue;
 
     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
@@ -1136,6 +1168,7 @@ gst_aggregator_aggregate_func (GstAggregator * self)
       gst_aggregator_push_eos (self);
     }
 
+  handle_error:
     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
 
     if (flow_return != GST_FLOW_OK) {
@@ -1442,7 +1475,6 @@ gst_aggregator_default_sink_event (GstAggregator * self,
       GstBuffer *gapbuf;
 
       gst_event_parse_gap (event, &pts, &duration);
-      gapbuf = gst_buffer_new ();
 
       if (GST_CLOCK_TIME_IS_VALID (duration))
         endpts = pts + duration;
@@ -1464,14 +1496,17 @@ gst_aggregator_default_sink_event (GstAggregator * self,
       else
         duration = GST_CLOCK_TIME_NONE;
 
+      gapbuf = gst_buffer_new ();
       GST_BUFFER_PTS (gapbuf) = pts;
       GST_BUFFER_DURATION (gapbuf) = duration;
       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
 
       /* Remove GAP event so we can replace it with the buffer */
+      PAD_LOCK (aggpad);
       if (g_queue_peek_tail (&aggpad->priv->data) == event)
         gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
+      PAD_UNLOCK (aggpad);
 
       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
           GST_FLOW_OK) {
@@ -1634,17 +1669,26 @@ gst_aggregator_default_create_new_pad (GstAggregator * self,
 
   GST_OBJECT_LOCK (self);
   if (req_name == NULL || strlen (req_name) < 6
-      || !g_str_has_prefix (req_name, "sink_")) {
+      || !g_str_has_prefix (req_name, "sink_")
+      || strrchr (req_name, '%') != NULL) {
     /* no name given when requesting the pad, use next available int */
     serial = ++priv->max_padserial;
   } else {
+    gchar *endptr = NULL;
+
     /* parse serial number from requested padname */
-    serial = g_ascii_strtoull (&req_name[5], NULL, 10);
-    if (serial > priv->max_padserial)
-      priv->max_padserial = serial;
+    serial = g_ascii_strtoull (&req_name[5], &endptr, 10);
+    if (endptr != NULL && *endptr == '\0') {
+      if (serial > priv->max_padserial) {
+        priv->max_padserial = serial;
+      }
+    } else {
+      serial = ++priv->max_padserial;
+    }
   }
 
   name = g_strdup_printf ("sink_%u", serial);
+  g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
   agg_pad = g_object_new (pad_type,
       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
   g_free (name);
@@ -1717,6 +1761,16 @@ gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
     return FALSE;
   }
 
+  if (self->priv->upstream_latency_min > min) {
+    GstClockTimeDiff diff =
+        GST_CLOCK_DIFF (min, self->priv->upstream_latency_min);
+
+    min += diff;
+    if (GST_CLOCK_TIME_IS_VALID (max)) {
+      max += diff;
+    }
+  }
+
   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
@@ -1828,8 +1882,8 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event)
         &start, &stop_type, &stop);
 
     GST_OBJECT_LOCK (self);
-    gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
-        stop_type, stop, NULL);
+    gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
+        flags, start_type, start, stop_type, stop, NULL);
     self->priv->seqnum = gst_event_get_seqnum (event);
     self->priv->first_buffer = FALSE;
     GST_OBJECT_UNLOCK (self);
@@ -1888,7 +1942,6 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
     } else {
       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
-      gst_object_unref (peer);
     }
   }
 
@@ -1928,6 +1981,9 @@ gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
 
   evdata->result &= ret;
 
+  if (peer)
+    gst_object_unref (peer);
+
   /* Always send to all pads */
   return FALSE;
 }
@@ -1987,8 +2043,8 @@ gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
     priv->flush_seeking = TRUE;
   }
 
-  gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
-      stop_type, stop, NULL);
+  gst_segment_do_seek (&GST_AGGREGATOR_PAD (self->srcpad)->segment, rate, fmt,
+      flags, start_type, start, stop_type, stop, NULL);
 
   /* Seeking sets a position */
   self->priv->first_buffer = FALSE;
@@ -2238,6 +2294,11 @@ gst_aggregator_set_property (GObject * object, guint prop_id,
     case PROP_LATENCY:
       gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
       break;
+    case PROP_MIN_UPSTREAM_LATENCY:
+      SRC_LOCK (agg);
+      agg->priv->upstream_latency_min = g_value_get_uint64 (value);
+      SRC_UNLOCK (agg);
+      break;
     case PROP_START_TIME_SELECTION:
       agg->priv->start_time_selection = g_value_get_enum (value);
       break;
@@ -2260,6 +2321,11 @@ gst_aggregator_get_property (GObject * object, guint prop_id,
     case PROP_LATENCY:
       g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
       break;
+    case PROP_MIN_UPSTREAM_LATENCY:
+      SRC_LOCK (agg);
+      g_value_set_uint64 (value, agg->priv->upstream_latency_min);
+      SRC_UNLOCK (agg);
+      break;
     case PROP_START_TIME_SELECTION:
       g_value_set_enum (value, agg->priv->start_time_selection);
       break;
@@ -2280,11 +2346,13 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
   GstElementClass *gstelement_class = (GstElementClass *) klass;
 
   aggregator_parent_class = g_type_class_peek_parent (klass);
-  g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
 
   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
       GST_DEBUG_FG_MAGENTA, "GstAggregator");
 
+  if (aggregator_private_offset != 0)
+    g_type_class_adjust_private_offset (klass, &aggregator_private_offset);
+
   klass->finish_buffer = gst_aggregator_default_finish_buffer;
 
   klass->sink_event = gst_aggregator_default_sink_event;
@@ -2317,6 +2385,27 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
           "position (in nanoseconds)", 0, G_MAXUINT64,
           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  /**
+   * GstAggregator:min-upstream-latency:
+   *
+   * Force minimum upstream latency (in nanoseconds). When sources with a
+   * higher latency are expected to be plugged in dynamically after the
+   * aggregator has started playing, this allows overriding the minimum
+   * latency reported by the initial source(s). This is only taken into
+   * account when larger than the actually reported minimum latency.
+   *
+   * Since: 1.16
+   */
+  g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY,
+      g_param_spec_uint64 ("min-upstream-latency", "Buffer latency",
+          "When sources with a higher latency are expected to be plugged "
+          "in dynamically after the aggregator has started playing, "
+          "this allows overriding the minimum latency reported by the "
+          "initial source(s). This is only taken into account when larger "
+          "than the actually reported minimum latency. (nanoseconds)",
+          0, G_MAXUINT64,
+          DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
       g_param_spec_enum ("start-time-selection", "Start Time Selection",
           "Decides which start time is output",
@@ -2331,17 +2420,22 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 }
 
+static inline gpointer
+gst_aggregator_get_instance_private (GstAggregator * self)
+{
+  return (G_STRUCT_MEMBER_P (self, aggregator_private_offset));
+}
+
 static void
 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
 {
   GstPadTemplate *pad_template;
   GstAggregatorPrivate *priv;
+  GType pad_type;
 
   g_return_if_fail (klass->aggregate != NULL);
 
-  self->priv =
-      G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
-      GstAggregatorPrivate);
+  self->priv = gst_aggregator_get_instance_private (self);
 
   priv = self->priv;
 
@@ -2356,9 +2450,17 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
   self->priv->has_peer_latency = FALSE;
-  gst_aggregator_reset_flow_values (self);
 
-  self->srcpad = gst_pad_new_from_template (pad_template, "src");
+  pad_type =
+      GST_PAD_TEMPLATE_GTYPE (pad_template) ==
+      G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD :
+      GST_PAD_TEMPLATE_GTYPE (pad_template);
+  g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
+  self->srcpad =
+      g_object_new (pad_type, "name", "src", "direction", GST_PAD_SRC,
+      "template", pad_template, NULL);
+
+  gst_aggregator_reset_flow_values (self);
 
   gst_pad_set_event_function (self->srcpad,
       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
@@ -2369,6 +2471,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
 
   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
 
+  self->priv->upstream_latency_min = DEFAULT_MIN_UPSTREAM_LATENCY;
   self->priv->latency = DEFAULT_LATENCY;
   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
   self->priv->start_time = DEFAULT_START_TIME;
@@ -2400,6 +2503,10 @@ gst_aggregator_get_type (void)
 
     _type = g_type_register_static (GST_TYPE_ELEMENT,
         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
+
+    aggregator_private_offset =
+        g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate));
+
     g_once_init_leave (&type, _type);
   }
   return type;
@@ -2517,6 +2624,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
 
   if (self->priv->first_buffer) {
     GstClockTime start_time;
+    GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
 
     switch (self->priv->start_time_selection) {
       case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
@@ -2550,10 +2658,10 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
     }
 
     if (start_time != -1) {
-      if (self->segment.position == -1)
-        self->segment.position = start_time;
+      if (srcpad->segment.position == -1)
+        srcpad->segment.position = start_time;
       else
-        self->segment.position = MIN (start_time, self->segment.position);
+        srcpad->segment.position = MIN (start_time, srcpad->segment.position);
 
       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
           GST_TIME_ARGS (start_time));
@@ -2732,7 +2840,23 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad,
 /***********************************
  * GstAggregatorPad implementation  *
  ************************************/
-G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
+G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
+
+#define DEFAULT_PAD_EMIT_SIGNALS FALSE
+
+enum
+{
+  PAD_PROP_0,
+  PAD_PROP_EMIT_SIGNALS,
+};
+
+enum
+{
+  PAD_SIGNAL_BUFFER_CONSUMED,
+  PAD_LAST_SIGNAL,
+};
+
+static guint gst_aggregator_pad_signals[PAD_LAST_SIGNAL] = { 0 };
 
 static void
 gst_aggregator_pad_constructed (GObject * object)
@@ -2774,23 +2898,80 @@ gst_aggregator_pad_dispose (GObject * object)
 }
 
 static void
+gst_aggregator_pad_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
+
+  switch (prop_id) {
+    case PAD_PROP_EMIT_SIGNALS:
+      pad->priv->emit_signals = g_value_get_boolean (value);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_aggregator_pad_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
+
+  switch (prop_id) {
+    case PAD_PROP_EMIT_SIGNALS:
+      g_value_set_boolean (value, pad->priv->emit_signals);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
 {
   GObjectClass *gobject_class = (GObjectClass *) klass;
 
-  g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
-
   gobject_class->constructed = gst_aggregator_pad_constructed;
   gobject_class->finalize = gst_aggregator_pad_finalize;
   gobject_class->dispose = gst_aggregator_pad_dispose;
+  gobject_class->set_property = gst_aggregator_pad_set_property;
+  gobject_class->get_property = gst_aggregator_pad_get_property;
+
+  /**
+   * GstAggregatorPad:buffer-consumed:
+   *
+   * Signals that a buffer was consumed. As aggregator pads store buffers
+   * in an internal queue, there is no direct match between input and output
+   * buffers at any given time. This signal can be useful to forward metas
+   * such as #GstVideoTimeCodeMeta or #GstVideoCaptionMeta at the right time.
+   *
+   * Since: 1.16
+   */
+  gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED] =
+      g_signal_new ("buffer-consumed", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_FIRST, 0, NULL, NULL, g_cclosure_marshal_generic,
+      G_TYPE_NONE, 1, GST_TYPE_BUFFER);
+
+  /**
+   * GstAggregatorPad:emit-signals:
+   *
+   * Enables the emission of signals such as #GstAggregatorPad::buffer-consumed
+   *
+   * Since: 1.16
+   */
+  g_object_class_install_property (gobject_class, PAD_PROP_EMIT_SIGNALS,
+      g_param_spec_boolean ("emit-signals", "Emit signals",
+          "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 }
 
 static void
 gst_aggregator_pad_init (GstAggregatorPad * pad)
 {
-  pad->priv =
-      G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
-      GstAggregatorPadPrivate);
+  pad->priv = gst_aggregator_pad_get_instance_private (pad);
 
   g_queue_init (&pad->priv->data);
   g_cond_init (&pad->priv->event_cond);
@@ -2800,14 +2981,19 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
 
   gst_aggregator_pad_reset_unlocked (pad);
   pad->priv->negotiated = FALSE;
+  pad->priv->emit_signals = DEFAULT_PAD_EMIT_SIGNALS;
 }
 
 /* Must be called with the PAD_LOCK held */
 static void
-gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
+gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer)
 {
   pad->priv->num_buffers--;
-  GST_TRACE_OBJECT (pad, "Consuming buffer");
+  GST_TRACE_OBJECT (pad, "Consuming buffer %" GST_PTR_FORMAT, buffer);
+  if (buffer && pad->priv->emit_signals) {
+    g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
+        0, buffer);
+  }
   PAD_BROADCAST_EVENT (pad);
 }
 
@@ -2844,7 +3030,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
       buffer = aggclass->clip (self, pad, buffer);
 
       if (buffer == NULL) {
-        gst_aggregator_pad_buffer_consumed (pad);
+        gst_aggregator_pad_buffer_consumed (pad, buffer);
         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
       }
     }
@@ -2872,13 +3058,18 @@ gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
 
   PAD_LOCK (pad);
 
+  if (pad->priv->flow_return != GST_FLOW_OK) {
+    PAD_UNLOCK (pad);
+    return NULL;
+  }
+
   gst_aggregator_pad_clip_buffer_unlocked (pad);
 
   buffer = pad->priv->clipped_buffer;
 
   if (buffer) {
     pad->priv->clipped_buffer = NULL;
-    gst_aggregator_pad_buffer_consumed (pad);
+    gst_aggregator_pad_buffer_consumed (pad, buffer);
     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
   }
 
@@ -2924,6 +3115,11 @@ gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
 
   PAD_LOCK (pad);
 
+  if (pad->priv->flow_return != GST_FLOW_OK) {
+    PAD_UNLOCK (pad);
+    return NULL;
+  }
+
   gst_aggregator_pad_clip_buffer_unlocked (pad);
 
   if (pad->priv->clipped_buffer) {
@@ -2937,6 +3133,31 @@ gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
 }
 
 /**
+ * gst_aggregator_pad_has_buffer:
+ * @pad: the pad to check the buffer on
+ *
+ * This checks if a pad has a buffer available that will be returned by
+ * a call to gst_aggregator_pad_peek_buffer() or
+ * gst_aggregator_pad_pop_buffer().
+ *
+ * Returns: %TRUE if the pad has a buffer available as the next thing.
+ *
+ * Since: 1.14.1
+ */
+gboolean
+gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
+{
+  gboolean has_buffer;
+
+  PAD_LOCK (pad);
+  gst_aggregator_pad_clip_buffer_unlocked (pad);
+  has_buffer = (pad->priv->clipped_buffer != NULL);
+  PAD_UNLOCK (pad);
+
+  return has_buffer;
+}
+
+/**
  * gst_aggregator_pad_is_eos:
  * @pad: an aggregator pad
  *
@@ -3080,3 +3301,40 @@ gst_aggregator_get_allocator (GstAggregator * self,
   if (params)
     *params = self->priv->allocation_params;
 }
+
+/**
+ * gst_aggregator_simple_get_next_time:
+ * @self: A #GstAggregator
+ *
+ * This is a simple #GstAggregator::get_next_time implementation that
+ * just looks at the #GstSegment on the srcpad of the aggregator and bases
+ * the next time on the running time there.
+ *
+ * This is the desired behaviour in most cases where you have a live source
+ * and you have a dead line based aggregator subclass.
+ *
+ * Returns: The running time based on the position
+ *
+ * Since: 1.16
+ */
+GstClockTime
+gst_aggregator_simple_get_next_time (GstAggregator * self)
+{
+  GstClockTime next_time;
+  GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
+  GstSegment *segment = &srcpad->segment;
+
+  GST_OBJECT_LOCK (self);
+  if (segment->position == -1 || segment->position < segment->start)
+    next_time = segment->start;
+  else
+    next_time = segment->position;
+
+  if (segment->stop != -1 && next_time > segment->stop)
+    next_time = segment->stop;
+
+  next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
+  GST_OBJECT_UNLOCK (self);
+
+  return next_time;
+}