aggregator: expose sample selection API
authorMathieu Duponchelle <mathieu@centricular.com>
Tue, 30 Jun 2020 19:10:05 +0000 (21:10 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Fri, 31 Jul 2020 06:59:08 +0000 (09:59 +0300)
See https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/771
for context.

This exposes new API that subclasses must call from their
aggregate() implementation to signal that they have selected
the next samples they will aggregate: gst_aggregator_selected_samples()

GstAggregator will emit a new signal there, `samples-selected`,
handlers can then look up samples per pad with the newly-added
gst_aggregator_peek_next_sample.

In addition, a new FIXME is logged when subclasses haven't actually
called `selected_samples` from their aggregate() implementation.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/549>

libs/gst/base/gstaggregator.c
libs/gst/base/gstaggregator.h

index faf7831..951cb7b 100644 (file)
@@ -297,6 +297,31 @@ gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
   return TRUE;
 }
 
+/**
+ * gst_aggregator_peek_next_sample:
+ *
+ * Use this function to determine what input buffers will be aggregated
+ * to produce the next output buffer. This should only be called from
+ * a #GstAggregator::samples-selected handler, and can be used to precisely
+ * control aggregating parameters for a given set of input samples.
+ *
+ * Returns: The sample that is about to be aggregated. It may hold a #GstBuffer
+ *   or a #GstBufferList. The contents of its info structure is subclass-dependent,
+ *   and documented on a subclass basis. The buffers held by the sample are
+ *   not writable.
+ * Since: 1.18
+ */
+GstSample *
+gst_aggregator_peek_next_sample (GstAggregator * agg, GstAggregatorPad * aggpad)
+{
+  GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (agg);
+
+  if (klass->peek_next_sample)
+    return (klass->peek_next_sample (agg, aggpad));
+
+  return NULL;
+}
+
 /*************************************
  * GstAggregator implementation  *
  *************************************/
@@ -339,6 +364,7 @@ struct _GstAggregatorPrivate
 
   /* aggregate */
   GstClockID aggregate_id;      /* protected by src_lock */
+  gboolean selected_samples_called_or_warned;   /* protected by src_lock */
   GMutex src_lock;
   GCond src_cond;
 
@@ -354,6 +380,7 @@ struct _GstAggregatorPrivate
 
   /* properties */
   gint64 latency;               /* protected by both src_lock and all pad locks */
+  gboolean emit_signals;
 };
 
 /* Seek event forwarding helper */
@@ -373,6 +400,7 @@ typedef struct
 #define DEFAULT_MIN_UPSTREAM_LATENCY              0
 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
 #define DEFAULT_START_TIME           (-1)
+#define DEFAULT_EMIT_SIGNALS         FALSE
 
 enum
 {
@@ -381,9 +409,18 @@ enum
   PROP_MIN_UPSTREAM_LATENCY,
   PROP_START_TIME_SELECTION,
   PROP_START_TIME,
+  PROP_EMIT_SIGNALS,
   PROP_LAST
 };
 
+enum
+{
+  SIGNAL_SAMPLES_SELECTED,
+  LAST_SIGNAL,
+};
+
+static guint gst_aggregator_signals[LAST_SIGNAL] = { 0 };
+
 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
 
@@ -1282,6 +1319,13 @@ gst_aggregator_aggregate_func (GstAggregator * self)
       flow_return = klass->aggregate (self, timeout);
     }
 
+    if (!priv->selected_samples_called_or_warned) {
+      GST_FIXME_OBJECT (self,
+          "Subclass should call gst_aggregator_selected_samples() from its "
+          "aggregate implementation.");
+      priv->selected_samples_called_or_warned = TRUE;
+    }
+
     if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
       continue;
 
@@ -2551,6 +2595,9 @@ gst_aggregator_set_property (GObject * object, guint prop_id,
     case PROP_START_TIME:
       agg->priv->start_time = g_value_get_uint64 (value);
       break;
+    case PROP_EMIT_SIGNALS:
+      agg->priv->emit_signals = g_value_get_boolean (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -2578,6 +2625,9 @@ gst_aggregator_get_property (GObject * object, guint prop_id,
     case PROP_START_TIME:
       g_value_set_uint64 (value, agg->priv->start_time);
       break;
+    case PROP_EMIT_SIGNALS:
+      g_value_set_boolean (value, agg->priv->emit_signals);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -2670,6 +2720,31 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
           "Start time to use if start-time-selection=set", 0,
           G_MAXUINT64,
           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
+   * GstAggregator:emit-signals:
+   *
+   * Enables the emission of signals such as #GstAggregator::samples-selected
+   *
+   * Since: 1.18
+   */
+  g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
+      g_param_spec_boolean ("emit-signals", "Emit signals",
+          "Send signals", DEFAULT_EMIT_SIGNALS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
+   * GstAggregator::samples-selected:
+   *
+   * Signals that the #GstAggregator subclass has selected the next set
+   * of input samples it will aggregate. Handlers may call
+   * gst_aggregator_peek_next_sample() at that point.
+   *
+   * Since: 1.18
+   */
+  gst_aggregator_signals[SIGNAL_SAMPLES_SELECTED] =
+      g_signal_new ("samples-selected", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_FIRST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
 }
 
 static inline gpointer
@@ -3153,7 +3228,6 @@ static void
 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer)
 {
   pad->priv->num_buffers--;
-  GST_TRACE_OBJECT (pad, "Consuming buffer %" GST_PTR_FORMAT, buffer);
   if (buffer && pad->priv->emit_signals) {
     g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
         0, buffer);
@@ -3526,3 +3600,26 @@ gst_aggregator_update_segment (GstAggregator * self, const GstSegment * segment)
   self->priv->send_segment = TRUE;
   GST_OBJECT_UNLOCK (self);
 }
+
+/**
+ * gst_aggregator_selected_samples:
+ *
+ * Subclasses should call this when they have prepared the
+ * buffers they will aggregate for each of their sink pads, but
+ * before using any of the properties of the pads that govern
+ * *how* aggregation should be performed, for example z-index
+ * for video aggregators.
+ *
+ * Since: 1.18
+ */
+void
+gst_aggregator_selected_samples (GstAggregator * self)
+{
+  g_return_if_fail (GST_IS_AGGREGATOR (self));
+
+  if (self->priv->emit_signals) {
+    g_signal_emit (self, gst_aggregator_signals[SIGNAL_SAMPLES_SELECTED], 0);
+  }
+
+  self->priv->selected_samples_called_or_warned = TRUE;
+}
index 57e3f8f..fa641a8 100644 (file)
@@ -340,9 +340,18 @@ struct _GstAggregatorClass {
    */
   GstFlowReturn     (*finish_buffer_list) (GstAggregator    * aggregator,
                                            GstBufferList    * bufferlist);
+  /**
+   * GstAggregatorClass::peek_next_sample:
+   *
+   * See gst_aggregator_peek_next_sample().
+   *
+   * Since: 1.18
+   */
+  GstSample *       (*peek_next_sample)         (GstAggregator *aggregator,
+                                                 GstAggregatorPad * aggregator_pad);
 
   /*< private >*/
-  gpointer          _gst_reserved[GST_PADDING_LARGE-4];
+  gpointer          _gst_reserved[GST_PADDING_LARGE-5];
 };
 
 /************************************
@@ -404,6 +413,13 @@ GST_BASE_API
 void            gst_aggregator_update_segment       (GstAggregator                * self,
                                                      const GstSegment             * segment);
 
+GST_BASE_API
+GstSample     * gst_aggregator_peek_next_sample     (GstAggregator *self,
+                                                     GstAggregatorPad * pad);
+
+GST_BASE_API
+void            gst_aggregator_selected_samples     (GstAggregator                * self);
+
 /**
  * GstAggregatorStartTimeSelection:
  * @GST_AGGREGATOR_START_TIME_SELECTION_ZERO: Start at running time 0.