aggregator: Assert if the sink/src pad type that is to be used is not a GstAggregator...
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
index 10d5caf..94d4816 100644 (file)
@@ -130,7 +130,8 @@ static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
 
 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
 
-static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
+static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad,
+    GstBuffer * buffer);
 
 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
 #define GST_CAT_DEFAULT aggregator_debug
@@ -261,6 +262,9 @@ struct _GstAggregatorPadPrivate
    * the chain function is also happening.
    */
   GMutex flush_lock;
+
+  /* properties */
+  gboolean emit_signals;
 };
 
 /* Must be called with PAD_LOCK held */
@@ -291,7 +295,7 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
   PAD_UNLOCK (aggpad);
 
   if (klass->flush)
-    return klass->flush (aggpad, agg);
+    return (klass->flush (aggpad, agg) == GST_FLOW_OK);
 
   return TRUE;
 }
@@ -817,7 +821,7 @@ gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
     if (GST_IS_BUFFER (item->data)
         && klass->skip_buffer (aggpad, agg, item->data)) {
       GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
-      gst_aggregator_pad_buffer_consumed (aggpad);
+      gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data));
       gst_buffer_unref (item->data);
       g_queue_delete_link (&aggpad->priv->data, item);
     } else {
@@ -1471,7 +1475,6 @@ gst_aggregator_default_sink_event (GstAggregator * self,
       GstBuffer *gapbuf;
 
       gst_event_parse_gap (event, &pts, &duration);
-      gapbuf = gst_buffer_new ();
 
       if (GST_CLOCK_TIME_IS_VALID (duration))
         endpts = pts + duration;
@@ -1493,14 +1496,17 @@ gst_aggregator_default_sink_event (GstAggregator * self,
       else
         duration = GST_CLOCK_TIME_NONE;
 
+      gapbuf = gst_buffer_new ();
       GST_BUFFER_PTS (gapbuf) = pts;
       GST_BUFFER_DURATION (gapbuf) = duration;
       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
 
       /* Remove GAP event so we can replace it with the buffer */
+      PAD_LOCK (aggpad);
       if (g_queue_peek_tail (&aggpad->priv->data) == event)
         gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
+      PAD_UNLOCK (aggpad);
 
       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
           GST_FLOW_OK) {
@@ -1663,17 +1669,26 @@ gst_aggregator_default_create_new_pad (GstAggregator * self,
 
   GST_OBJECT_LOCK (self);
   if (req_name == NULL || strlen (req_name) < 6
-      || !g_str_has_prefix (req_name, "sink_")) {
+      || !g_str_has_prefix (req_name, "sink_")
+      || strrchr (req_name, '%') != NULL) {
     /* no name given when requesting the pad, use next available int */
     serial = ++priv->max_padserial;
   } else {
+    gchar *endptr = NULL;
+
     /* parse serial number from requested padname */
-    serial = g_ascii_strtoull (&req_name[5], NULL, 10);
-    if (serial > priv->max_padserial)
-      priv->max_padserial = serial;
+    serial = g_ascii_strtoull (&req_name[5], &endptr, 10);
+    if (endptr != NULL && *endptr == '\0') {
+      if (serial > priv->max_padserial) {
+        priv->max_padserial = serial;
+      }
+    } else {
+      serial = ++priv->max_padserial;
+    }
   }
 
   name = g_strdup_printf ("sink_%u", serial);
+  g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
   agg_pad = g_object_new (pad_type,
       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
   g_free (name);
@@ -2416,6 +2431,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
 {
   GstPadTemplate *pad_template;
   GstAggregatorPrivate *priv;
+  GType pad_type;
 
   g_return_if_fail (klass->aggregate != NULL);
 
@@ -2435,7 +2451,14 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
   self->priv->has_peer_latency = FALSE;
 
-  self->srcpad = gst_pad_new_from_template (pad_template, "src");
+  pad_type =
+      GST_PAD_TEMPLATE_GTYPE (pad_template) ==
+      G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD :
+      GST_PAD_TEMPLATE_GTYPE (pad_template);
+  g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
+  self->srcpad =
+      g_object_new (pad_type, "name", "src", "direction", GST_PAD_SRC,
+      "template", pad_template, NULL);
 
   gst_aggregator_reset_flow_values (self);
 
@@ -2819,6 +2842,22 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad,
  ************************************/
 G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
 
+#define DEFAULT_PAD_EMIT_SIGNALS FALSE
+
+enum
+{
+  PAD_PROP_0,
+  PAD_PROP_EMIT_SIGNALS,
+};
+
+enum
+{
+  PAD_SIGNAL_BUFFER_CONSUMED,
+  PAD_LAST_SIGNAL,
+};
+
+static guint gst_aggregator_pad_signals[PAD_LAST_SIGNAL] = { 0 };
+
 static void
 gst_aggregator_pad_constructed (GObject * object)
 {
@@ -2859,6 +2898,38 @@ gst_aggregator_pad_dispose (GObject * object)
 }
 
 static void
+gst_aggregator_pad_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
+
+  switch (prop_id) {
+    case PAD_PROP_EMIT_SIGNALS:
+      pad->priv->emit_signals = g_value_get_boolean (value);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_aggregator_pad_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
+
+  switch (prop_id) {
+    case PAD_PROP_EMIT_SIGNALS:
+      g_value_set_boolean (value, pad->priv->emit_signals);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
 {
   GObjectClass *gobject_class = (GObjectClass *) klass;
@@ -2866,6 +2937,35 @@ gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
   gobject_class->constructed = gst_aggregator_pad_constructed;
   gobject_class->finalize = gst_aggregator_pad_finalize;
   gobject_class->dispose = gst_aggregator_pad_dispose;
+  gobject_class->set_property = gst_aggregator_pad_set_property;
+  gobject_class->get_property = gst_aggregator_pad_get_property;
+
+  /**
+   * GstAggregatorPad:buffer-consumed:
+   *
+   * Signals that a buffer was consumed. As aggregator pads store buffers
+   * in an internal queue, there is no direct match between input and output
+   * buffers at any given time. This signal can be useful to forward metas
+   * such as #GstVideoTimeCodeMeta or #GstVideoCaptionMeta at the right time.
+   *
+   * Since: 1.16
+   */
+  gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED] =
+      g_signal_new ("buffer-consumed", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_FIRST, 0, NULL, NULL, g_cclosure_marshal_generic,
+      G_TYPE_NONE, 1, GST_TYPE_BUFFER);
+
+  /**
+   * GstAggregatorPad:emit-signals:
+   *
+   * Enables the emission of signals such as #GstAggregatorPad::buffer-consumed
+   *
+   * Since: 1.16
+   */
+  g_object_class_install_property (gobject_class, PAD_PROP_EMIT_SIGNALS,
+      g_param_spec_boolean ("emit-signals", "Emit signals",
+          "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 }
 
 static void
@@ -2881,14 +2981,19 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
 
   gst_aggregator_pad_reset_unlocked (pad);
   pad->priv->negotiated = FALSE;
+  pad->priv->emit_signals = DEFAULT_PAD_EMIT_SIGNALS;
 }
 
 /* Must be called with the PAD_LOCK held */
 static void
-gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
+gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer)
 {
   pad->priv->num_buffers--;
-  GST_TRACE_OBJECT (pad, "Consuming buffer");
+  GST_TRACE_OBJECT (pad, "Consuming buffer %" GST_PTR_FORMAT, buffer);
+  if (buffer && pad->priv->emit_signals) {
+    g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
+        0, buffer);
+  }
   PAD_BROADCAST_EVENT (pad);
 }
 
@@ -2925,7 +3030,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
       buffer = aggclass->clip (self, pad, buffer);
 
       if (buffer == NULL) {
-        gst_aggregator_pad_buffer_consumed (pad);
+        gst_aggregator_pad_buffer_consumed (pad, buffer);
         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
       }
     }
@@ -2964,7 +3069,7 @@ gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
 
   if (buffer) {
     pad->priv->clipped_buffer = NULL;
-    gst_aggregator_pad_buffer_consumed (pad);
+    gst_aggregator_pad_buffer_consumed (pad, buffer);
     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
   }
 
@@ -3203,7 +3308,7 @@ gst_aggregator_get_allocator (GstAggregator * self,
  *
  * This is a simple #GstAggregator::get_next_time implementation that
  * just looks at the #GstSegment on the srcpad of the aggregator and bases
- * the next time on the running there there.
+ * the next time on the running time there.
  *
  * This is the desired behaviour in most cases where you have a live source
  * and you have a dead line based aggregator subclass.