From ae57b1c76d6f5ddd5b38f9be2df4a66bfcd574b5 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Mon, 1 Apr 2019 12:22:49 +0200 Subject: [PATCH] aggregator: add buffer-consumed pad signal The signal will be emitted when a buffer was consumed on a pad, if the newly-added "emit-signals" property has been set to TRUE. Handlers connected to the signal will receive a valid reference on the consumed buffer, allowing for example the retrieval of metas in order to forward them once an output buffer is pushed out. --- libs/gst/base/gstaggregator.c | 98 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 92 insertions(+), 6 deletions(-) diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index 0acd8a2..3228193 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -130,7 +130,8 @@ static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg); static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self); -static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad); +static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, + GstBuffer * buffer); GST_DEBUG_CATEGORY_STATIC (aggregator_debug); #define GST_CAT_DEFAULT aggregator_debug @@ -261,6 +262,9 @@ struct _GstAggregatorPadPrivate * the chain function is also happening. */ GMutex flush_lock; + + /* properties */ + gboolean emit_signals; }; /* Must be called with PAD_LOCK held */ @@ -817,7 +821,7 @@ gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad, if (GST_IS_BUFFER (item->data) && klass->skip_buffer (aggpad, agg, item->data)) { GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data); - gst_aggregator_pad_buffer_consumed (aggpad); + gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data)); gst_buffer_unref (item->data); g_queue_delete_link (&aggpad->priv->data, item); } else { @@ -2821,6 +2825,22 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad, ************************************/ G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD); +#define DEFAULT_PAD_EMIT_SIGNALS FALSE + +enum +{ + PAD_PROP_0, + PAD_PROP_EMIT_SIGNALS, +}; + +enum +{ + PAD_SIGNAL_BUFFER_CONSUMED, + PAD_LAST_SIGNAL, +}; + +static guint gst_aggregator_pad_signals[PAD_LAST_SIGNAL] = { 0 }; + static void gst_aggregator_pad_constructed (GObject * object) { @@ -2861,6 +2881,38 @@ gst_aggregator_pad_dispose (GObject * object) } static void +gst_aggregator_pad_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object); + + switch (prop_id) { + case PAD_PROP_EMIT_SIGNALS: + pad->priv->emit_signals = g_value_get_boolean (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_aggregator_pad_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object); + + switch (prop_id) { + case PAD_PROP_EMIT_SIGNALS: + g_value_set_boolean (value, pad->priv->emit_signals); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void gst_aggregator_pad_class_init (GstAggregatorPadClass * klass) { GObjectClass *gobject_class = (GObjectClass *) klass; @@ -2868,6 +2920,35 @@ gst_aggregator_pad_class_init (GstAggregatorPadClass * klass) gobject_class->constructed = gst_aggregator_pad_constructed; gobject_class->finalize = gst_aggregator_pad_finalize; gobject_class->dispose = gst_aggregator_pad_dispose; + gobject_class->set_property = gst_aggregator_pad_set_property; + gobject_class->get_property = gst_aggregator_pad_get_property; + + /** + * GstAggregatorPad::buffer-consumed: + * + * Signals that a buffer was consumed. As aggregator pads store buffers + * in an internal queue, there is no direct match between input and output + * buffers at any given time. This signal can be useful to forward metas + * such as #GstVideoTimeCodeMeta or #GstVideoCaptionMeta at the right time. + * + * Since: 1.16 + */ + gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED] = + g_signal_new ("buffer-consumed", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_FIRST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_TYPE_NONE, 1, GST_TYPE_BUFFER); + + /** + * GstAggregatorPad:emit-signals: + * + * Enables the emission of signals such as #GstAggregatorPad::buffer-consumed + * + * Since: 1.16 + */ + g_object_class_install_property (gobject_class, PAD_PROP_EMIT_SIGNALS, + g_param_spec_boolean ("emit-signals", "Emit signals", + "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static void @@ -2883,14 +2964,19 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) gst_aggregator_pad_reset_unlocked (pad); pad->priv->negotiated = FALSE; + pad->priv->emit_signals = DEFAULT_PAD_EMIT_SIGNALS; } /* Must be called with the PAD_LOCK held */ static void -gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad) +gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer) { pad->priv->num_buffers--; - GST_TRACE_OBJECT (pad, "Consuming buffer"); + GST_TRACE_OBJECT (pad, "Consuming buffer %" GST_PTR_FORMAT, buffer); + if (buffer && pad->priv->emit_signals) { + g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED], + 0, buffer); + } PAD_BROADCAST_EVENT (pad); } @@ -2927,7 +3013,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) buffer = aggclass->clip (self, pad, buffer); if (buffer == NULL) { - gst_aggregator_pad_buffer_consumed (pad); + gst_aggregator_pad_buffer_consumed (pad, buffer); GST_TRACE_OBJECT (pad, "Clipping consumed the buffer"); } } @@ -2966,7 +3052,7 @@ gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad) if (buffer) { pad->priv->clipped_buffer = NULL; - gst_aggregator_pad_buffer_consumed (pad); + gst_aggregator_pad_buffer_consumed (pad, buffer); GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); } -- 2.7.4