From d74efc1aed7006c35dff1932b431873b1f697c87 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Tue, 30 Jun 2020 21:10:05 +0200 Subject: [PATCH] aggregator: expose sample selection API See https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/771 for context. This exposes new API that subclasses must call from their aggregate() implementation to signal that they have selected the next samples they will aggregate: gst_aggregator_selected_samples() GstAggregator will emit a new signal there, `samples-selected`, handlers can then look up samples per pad with the newly-added gst_aggregator_peek_next_sample. In addition, a new FIXME is logged when subclasses haven't actually called `selected_samples` from their aggregate() implementation. Part-of: --- libs/gst/base/gstaggregator.c | 99 ++++++++++++++++++++++++++++++++++++++++++- libs/gst/base/gstaggregator.h | 18 +++++++- 2 files changed, 115 insertions(+), 2 deletions(-) diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index faf7831..951cb7b 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -297,6 +297,31 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg) return TRUE; } +/** + * gst_aggregator_peek_next_sample: + * + * Use this function to determine what input buffers will be aggregated + * to produce the next output buffer. This should only be called from + * a #GstAggregator::samples-selected handler, and can be used to precisely + * control aggregating parameters for a given set of input samples. + * + * Returns: The sample that is about to be aggregated. It may hold a #GstBuffer + * or a #GstBufferList. The contents of its info structure is subclass-dependent, + * and documented on a subclass basis. The buffers held by the sample are + * not writable. + * Since: 1.18 + */ +GstSample * +gst_aggregator_peek_next_sample (GstAggregator * agg, GstAggregatorPad * aggpad) +{ + GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (agg); + + if (klass->peek_next_sample) + return (klass->peek_next_sample (agg, aggpad)); + + return NULL; +} + /************************************* * GstAggregator implementation * *************************************/ @@ -339,6 +364,7 @@ struct _GstAggregatorPrivate /* aggregate */ GstClockID aggregate_id; /* protected by src_lock */ + gboolean selected_samples_called_or_warned; /* protected by src_lock */ GMutex src_lock; GCond src_cond; @@ -354,6 +380,7 @@ struct _GstAggregatorPrivate /* properties */ gint64 latency; /* protected by both src_lock and all pad locks */ + gboolean emit_signals; }; /* Seek event forwarding helper */ @@ -373,6 +400,7 @@ typedef struct #define DEFAULT_MIN_UPSTREAM_LATENCY 0 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO #define DEFAULT_START_TIME (-1) +#define DEFAULT_EMIT_SIGNALS FALSE enum { @@ -381,9 +409,18 @@ enum PROP_MIN_UPSTREAM_LATENCY, PROP_START_TIME_SELECTION, PROP_START_TIME, + PROP_EMIT_SIGNALS, PROP_LAST }; +enum +{ + SIGNAL_SAMPLES_SELECTED, + LAST_SIGNAL, +}; + +static guint gst_aggregator_signals[LAST_SIGNAL] = { 0 }; + static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head); @@ -1282,6 +1319,13 @@ gst_aggregator_aggregate_func (GstAggregator * self) flow_return = klass->aggregate (self, timeout); } + if (!priv->selected_samples_called_or_warned) { + GST_FIXME_OBJECT (self, + "Subclass should call gst_aggregator_selected_samples() from its " + "aggregate implementation."); + priv->selected_samples_called_or_warned = TRUE; + } + if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA) continue; @@ -2551,6 +2595,9 @@ gst_aggregator_set_property (GObject * object, guint prop_id, case PROP_START_TIME: agg->priv->start_time = g_value_get_uint64 (value); break; + case PROP_EMIT_SIGNALS: + agg->priv->emit_signals = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2578,6 +2625,9 @@ gst_aggregator_get_property (GObject * object, guint prop_id, case PROP_START_TIME: g_value_set_uint64 (value, agg->priv->start_time); break; + case PROP_EMIT_SIGNALS: + g_value_set_boolean (value, agg->priv->emit_signals); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2670,6 +2720,31 @@ gst_aggregator_class_init (GstAggregatorClass * klass) "Start time to use if start-time-selection=set", 0, G_MAXUINT64, DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstAggregator:emit-signals: + * + * Enables the emission of signals such as #GstAggregator::samples-selected + * + * Since: 1.18 + */ + g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS, + g_param_spec_boolean ("emit-signals", "Emit signals", + "Send signals", DEFAULT_EMIT_SIGNALS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstAggregator::samples-selected: + * + * Signals that the #GstAggregator subclass has selected the next set + * of input samples it will aggregate. Handlers may call + * gst_aggregator_peek_next_sample() at that point. + * + * Since: 1.18 + */ + gst_aggregator_signals[SIGNAL_SAMPLES_SELECTED] = + g_signal_new ("samples-selected", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_FIRST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0); } static inline gpointer @@ -3153,7 +3228,6 @@ static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer) { pad->priv->num_buffers--; - GST_TRACE_OBJECT (pad, "Consuming buffer %" GST_PTR_FORMAT, buffer); if (buffer && pad->priv->emit_signals) { g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED], 0, buffer); @@ -3526,3 +3600,26 @@ gst_aggregator_update_segment (GstAggregator * self, const GstSegment * segment) self->priv->send_segment = TRUE; GST_OBJECT_UNLOCK (self); } + +/** + * gst_aggregator_selected_samples: + * + * Subclasses should call this when they have prepared the + * buffers they will aggregate for each of their sink pads, but + * before using any of the properties of the pads that govern + * *how* aggregation should be performed, for example z-index + * for video aggregators. + * + * Since: 1.18 + */ +void +gst_aggregator_selected_samples (GstAggregator * self) +{ + g_return_if_fail (GST_IS_AGGREGATOR (self)); + + if (self->priv->emit_signals) { + g_signal_emit (self, gst_aggregator_signals[SIGNAL_SAMPLES_SELECTED], 0); + } + + self->priv->selected_samples_called_or_warned = TRUE; +} diff --git a/libs/gst/base/gstaggregator.h b/libs/gst/base/gstaggregator.h index 57e3f8f..fa641a8 100644 --- a/libs/gst/base/gstaggregator.h +++ b/libs/gst/base/gstaggregator.h @@ -340,9 +340,18 @@ struct _GstAggregatorClass { */ GstFlowReturn (*finish_buffer_list) (GstAggregator * aggregator, GstBufferList * bufferlist); + /** + * GstAggregatorClass::peek_next_sample: + * + * See gst_aggregator_peek_next_sample(). + * + * Since: 1.18 + */ + GstSample * (*peek_next_sample) (GstAggregator *aggregator, + GstAggregatorPad * aggregator_pad); /*< private >*/ - gpointer _gst_reserved[GST_PADDING_LARGE-4]; + gpointer _gst_reserved[GST_PADDING_LARGE-5]; }; /************************************ @@ -404,6 +413,13 @@ GST_BASE_API void gst_aggregator_update_segment (GstAggregator * self, const GstSegment * segment); +GST_BASE_API +GstSample * gst_aggregator_peek_next_sample (GstAggregator *self, + GstAggregatorPad * pad); + +GST_BASE_API +void gst_aggregator_selected_samples (GstAggregator * self); + /** * GstAggregatorStartTimeSelection: * @GST_AGGREGATOR_START_TIME_SELECTION_ZERO: Start at running time 0. -- 2.7.4