aggregator: Implement force_live API
authorMathieu Duponchelle <mathieu@centricular.com>
Fri, 18 Nov 2022 13:24:30 +0000 (14:24 +0100)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 18 Nov 2022 18:14:26 +0000 (18:14 +0000)
Setting force_live lets aggregator behave as if it had at least one of
its sinks connected to a live source, which should let us get rid of the
fake live test source hack that is probably present in dozens of
applications by now.

+ Expose API for subclasses to set and get force_live
+ Expose force-live properties in GstVideoAggregator and GstAudioAggregator
+ Adds a simple test

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3008>

subprojects/gst-plugins-base/gst-libs/gst/audio/gstaudioaggregator.c
subprojects/gst-plugins-base/gst-libs/gst/video/gstvideoaggregator.c
subprojects/gstreamer/libs/gst/base/gstaggregator.c
subprojects/gstreamer/libs/gst/base/gstaggregator.h
subprojects/gstreamer/tests/check/libs/aggregator.c

index a811c47..784c047 100644 (file)
@@ -537,6 +537,7 @@ static GstSample *gst_audio_aggregator_peek_next_sample (GstAggregator * agg,
 #define DEFAULT_DISCONT_WAIT (1 * GST_SECOND)
 #define DEFAULT_OUTPUT_BUFFER_DURATION_N (1)
 #define DEFAULT_OUTPUT_BUFFER_DURATION_D (100)
+#define DEFAULT_FORCE_LIVE FALSE
 
 enum
 {
@@ -546,6 +547,7 @@ enum
   PROP_DISCONT_WAIT,
   PROP_OUTPUT_BUFFER_DURATION_FRACTION,
   PROP_IGNORE_INACTIVE_PADS,
+  PROP_FORCE_LIVE,
 };
 
 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GstAudioAggregator, gst_audio_aggregator,
@@ -728,6 +730,23 @@ gst_audio_aggregator_class_init (GstAudioAggregatorClass * klass)
           "Ignore inactive pads",
           "Avoid timing out waiting for inactive pads", FALSE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
+   * GstAudioAggregator:force-live:
+   *
+   * Causes the element to aggregate on a timeout even when no live source is
+   * connected to its sinks. See #GstAggregator:min-upstream-latency for a
+   * companion property: in the vast majority of cases where you plan to plug in
+   * live sources with a non-zero latency, you should set it to a non-zero value.
+   *
+   * Since: 1.22
+   */
+  g_object_class_install_property (gobject_class, PROP_FORCE_LIVE,
+      g_param_spec_boolean ("force-live", "Force live",
+          "Always operate in live mode and aggregate on timeout regardless of "
+          "whether any live sources are linked upstream",
+          DEFAULT_FORCE_LIVE,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT_ONLY));
 }
 
 static void
@@ -797,6 +816,10 @@ gst_audio_aggregator_set_property (GObject * object, guint prop_id,
       gst_aggregator_set_ignore_inactive_pads (GST_AGGREGATOR (object),
           g_value_get_boolean (value));
       break;
+    case PROP_FORCE_LIVE:
+      gst_aggregator_set_force_live (GST_AGGREGATOR (object),
+          g_value_get_boolean (value));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -829,6 +852,10 @@ gst_audio_aggregator_get_property (GObject * object, guint prop_id,
       g_value_set_boolean (value,
           gst_aggregator_get_ignore_inactive_pads (GST_AGGREGATOR (object)));
       break;
+    case PROP_FORCE_LIVE:
+      g_value_set_boolean (value,
+          gst_aggregator_get_force_live (GST_AGGREGATOR (object)));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -2195,7 +2222,7 @@ gst_audio_aggregator_aggregate (GstAggregator * agg, gboolean timeout)
   gint64 next_timestamp;
   gint rate, bpf;
   gboolean dropped = FALSE;
-  gboolean is_eos = TRUE;
+  gboolean is_eos = !gst_aggregator_get_force_live (agg);
   gboolean is_done = TRUE;
   guint blocksize;
   GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
index 31e9d70..ffbf69d 100644 (file)
@@ -955,7 +955,13 @@ static void
         g_thread_self());                                      \
   } G_STMT_END
 
+enum
+{
+  PROP_0,
+  PROP_FORCE_LIVE,
+};
 
+#define DEFAULT_FORCE_LIVE              FALSE
 
 /* Can't use the G_DEFINE_TYPE macros because we need the
  * videoaggregator class in the _init to be able to set
@@ -1717,7 +1723,7 @@ gst_video_aggregator_fill_queues (GstVideoAggregator * vagg,
     GstClockTime output_end_running_time, gboolean timeout)
 {
   GList *l;
-  gboolean eos = TRUE;
+  gboolean eos = !gst_aggregator_get_force_live (GST_AGGREGATOR (vagg));
   gboolean repeat_pad_eos = FALSE;
   gboolean has_no_repeat_pads = FALSE;
   gboolean need_more_data = FALSE;
@@ -3002,6 +3008,10 @@ gst_video_aggregator_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec)
 {
   switch (prop_id) {
+    case PROP_FORCE_LIVE:
+      g_value_set_boolean (value,
+          gst_aggregator_get_force_live (GST_AGGREGATOR (object)));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -3013,6 +3023,10 @@ gst_video_aggregator_set_property (GObject * object,
     guint prop_id, const GValue * value, GParamSpec * pspec)
 {
   switch (prop_id) {
+    case PROP_FORCE_LIVE:
+      gst_aggregator_set_force_live (GST_AGGREGATOR (object),
+          g_value_get_boolean (value));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -3070,6 +3084,23 @@ gst_video_aggregator_class_init (GstVideoAggregatorClass * klass)
 
   /* Register the pad class */
   g_type_class_ref (GST_TYPE_VIDEO_AGGREGATOR_PAD);
+
+  /**
+   * GstVideoAggregator:force-live:
+   *
+   * Causes the element to aggregate on a timeout even when no live source is
+   * connected to its sinks. See #GstAggregator:min-upstream-latency for a
+   * companion property: in the vast majority of cases where you plan to plug in
+   * live sources with a non-zero latency, you should set it to a non-zero value.
+   *
+   * Since: 1.22
+   */
+  g_object_class_install_property (gobject_class, PROP_FORCE_LIVE,
+      g_param_spec_boolean ("force-live", "Force live",
+          "Always operate in live mode and aggregate on timeout regardless of "
+          "whether any live sources are linked upstream",
+          DEFAULT_FORCE_LIVE,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT_ONLY));
 }
 
 static void
index d51ac56..abd7110 100644 (file)
@@ -371,6 +371,7 @@ struct _GstAggregatorPrivate
   gboolean send_segment;
   gboolean flushing;
   gboolean send_eos;            /* protected by srcpad stream lock */
+  gboolean got_eos_event;       /* protected by srcpad stream lock */
 
   GstCaps *srccaps;             /* protected by the srcpad stream lock */
 
@@ -409,8 +410,16 @@ struct _GstAggregatorPrivate
   gint64 latency;               /* protected by both src_lock and all pad locks */
   gboolean emit_signals;
   gboolean ignore_inactive_pads;
+  gboolean force_live;          /* Construct only, doesn't need any locking */
 };
 
+/* With SRC_LOCK */
+static gboolean
+is_live_unlocked (GstAggregator * self)
+{
+  return self->priv->peer_latency_live || self->priv->force_live;
+}
+
 /* Seek event forwarding helper */
 typedef struct
 {
@@ -429,6 +438,7 @@ typedef struct
 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
 #define DEFAULT_START_TIME           (-1)
 #define DEFAULT_EMIT_SIGNALS         FALSE
+#define DEFAULT_FORCE_LIVE           FALSE
 
 enum
 {
@@ -501,7 +511,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
       break;
     }
 
-    if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live &&
+    if (self->priv->ignore_inactive_pads && is_live_unlocked (self) &&
         pad->priv->waited_once && pad->priv->first_buffer && !pad->priv->eos) {
       PAD_UNLOCK (pad);
       GST_LOG_OBJECT (pad, "Ignoring inactive pad");
@@ -528,7 +538,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
     } else {
       GST_TRACE_OBJECT (pad, "Have %" GST_TIME_FORMAT " queued in %u buffers",
           GST_TIME_ARGS (pad->priv->time_level), pad->priv->num_buffers);
-      if (self->priv->peer_latency_live) {
+      if (is_live_unlocked (self)) {
         /* In live mode, having a single pad with buffers is enough to
          * generate a start time from it. In non-live mode all pads need
          * to have a buffer
@@ -541,7 +551,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
     PAD_UNLOCK (pad);
   }
 
-  if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live
+  if (self->priv->ignore_inactive_pads && is_live_unlocked (self)
       && n_ready == 0)
     goto no_sinkpads;
 
@@ -1388,10 +1398,15 @@ gst_aggregator_aggregate_func (GstAggregator * self)
     if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
       goto handle_error;
 
-    if (self->priv->peer_latency_live)
+    if (is_live_unlocked (self))
       gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
           gst_aggregator_pad_skip_buffers, NULL);
 
+    if (self->priv->got_eos_event) {
+      gst_aggregator_push_eos (self);
+      continue;
+    }
+
     /* Ensure we have buffers ready (either in clipped_buffer or at the head of
      * the queue */
     if (!gst_aggregator_wait_and_check (self, &timeout)) {
@@ -1478,6 +1493,7 @@ gst_aggregator_start (GstAggregator * self)
   self->priv->send_stream_start = TRUE;
   self->priv->send_segment = TRUE;
   self->priv->send_eos = TRUE;
+  self->priv->got_eos_event = FALSE;
   self->priv->srccaps = NULL;
 
   self->priv->has_peer_latency = FALSE;
@@ -1699,6 +1715,7 @@ gst_aggregator_default_sink_event (GstAggregator * self,
         event = NULL;
         SRC_LOCK (self);
         priv->send_eos = TRUE;
+        priv->got_eos_event = FALSE;
         SRC_BROADCAST (self);
         SRC_UNLOCK (self);
 
@@ -1977,6 +1994,14 @@ gst_aggregator_change_state (GstElement * element, GstStateChange transition)
       SRC_LOCK (self);
       SRC_BROADCAST (self);
       SRC_UNLOCK (self);
+      if (self->priv->force_live) {
+        ret = GST_STATE_CHANGE_NO_PREROLL;
+      }
+      break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:
+      if (self->priv->force_live) {
+        ret = GST_STATE_CHANGE_NO_PREROLL;
+      }
       break;
     default:
       break;
@@ -2195,12 +2220,18 @@ gst_aggregator_get_latency_unlocked (GstAggregator * self)
 
     ret = gst_aggregator_query_latency_unlocked (self, query);
     gst_query_unref (query);
-    if (!ret)
+    /* If we've been set to live, we don't wait for a peer latency, we will
+     * simply query it again next time around */
+    if (!ret && !self->priv->force_live)
       return GST_CLOCK_TIME_NONE;
   }
 
-  if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
-    return GST_CLOCK_TIME_NONE;
+  /* If we've been set to live, we don't wait for a peer latency, we will
+   * simply query it again next time around */
+  if (!self->priv->force_live) {
+    if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
+      return GST_CLOCK_TIME_NONE;
+  }
 
   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
   latency = self->priv->peer_latency_min;
@@ -2262,8 +2293,17 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event)
 
     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
   }
+
   GST_STATE_UNLOCK (element);
 
+
+  if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+    SRC_LOCK (self);
+    self->priv->got_eos_event = TRUE;
+    SRC_BROADCAST (self);
+    SRC_UNLOCK (self);
+  }
+
   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
       event);
 }
@@ -2628,6 +2668,16 @@ flushing:
 }
 
 static void
+gst_aggregator_constructed (GObject * object)
+{
+  GstAggregator *agg = GST_AGGREGATOR (object);
+
+  if (agg->priv->force_live) {
+    GST_OBJECT_FLAG_SET (agg, GST_ELEMENT_FLAG_SOURCE);
+  }
+}
+
+static void
 gst_aggregator_finalize (GObject * object)
 {
   GstAggregator *self = (GstAggregator *) object;
@@ -2818,6 +2868,7 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
 
   gobject_class->set_property = gst_aggregator_set_property;
   gobject_class->get_property = gst_aggregator_get_property;
+  gobject_class->constructed = gst_aggregator_constructed;
   gobject_class->finalize = gst_aggregator_finalize;
 
   g_object_class_install_property (gobject_class, PROP_LATENCY,
@@ -2955,6 +3006,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
   self->priv->latency = DEFAULT_LATENCY;
   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
   self->priv->start_time = DEFAULT_START_TIME;
+  self->priv->force_live = DEFAULT_FORCE_LIVE;
 
   g_mutex_init (&self->priv->src_lock);
   g_cond_init (&self->priv->src_cond);
@@ -3007,7 +3059,7 @@ gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
 
   /* We also want at least two buffers, one is being processed and one is ready
    * for the next iteration when we operate in live mode. */
-  if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
+  if (is_live_unlocked (self) && aggpad->priv->num_buffers < 2)
     return TRUE;
 
   /* On top of our latency, we also want to allow buffering up to the
@@ -3648,7 +3700,7 @@ gst_aggregator_pad_is_inactive (GstAggregatorPad * pad)
   g_assert_nonnull (self);
 
   PAD_LOCK (pad);
-  inactive = self->priv->ignore_inactive_pads && self->priv->peer_latency_live
+  inactive = self->priv->ignore_inactive_pads && is_live_unlocked (self)
       && pad->priv->first_buffer;
   PAD_UNLOCK (pad);
 
@@ -3933,3 +3985,33 @@ gst_aggregator_get_ignore_inactive_pads (GstAggregator * self)
 
   return ret;
 }
+
+/**
+ * gst_aggregator_get_force_live:
+ *
+ * Subclasses may use the return value to inform whether they should return
+ * %GST_FLOW_EOS from their aggregate implementation.
+ *
+ * Returns: whether live status was forced on @self.
+ *
+ * Since: 1.22
+ */
+gboolean
+gst_aggregator_get_force_live (GstAggregator * self)
+{
+  return self->priv->force_live;
+}
+
+/**
+ * gst_aggregator_set_force_live:
+ *
+ * Subclasses should call this at construction time in order for @self to
+ * aggregate on a timeout even when no live source is connected.
+ *
+ * Since: 1.22
+ */
+void
+gst_aggregator_set_force_live (GstAggregator * self, gboolean force_live)
+{
+  self->priv->force_live = force_live;
+}
index 45ced12..bbac9e4 100644 (file)
@@ -434,6 +434,13 @@ void            gst_aggregator_set_ignore_inactive_pads (GstAggregator * self,
 GST_BASE_API
 gboolean        gst_aggregator_get_ignore_inactive_pads (GstAggregator * self);
 
+GST_BASE_API
+gboolean        gst_aggregator_get_force_live       (GstAggregator *self);
+
+GST_BASE_API
+void            gst_aggregator_set_force_live       (GstAggregator *self,
+                                                     gboolean force_live);
+
 /**
  * GstAggregatorStartTimeSelection:
  * @GST_AGGREGATOR_START_TIME_SELECTION_ZERO: Start at running time 0.
index 23ca1ad..215b061 100644 (file)
@@ -26,6 +26,7 @@
 
 #include <stdlib.h>
 #include <gst/check/gstcheck.h>
+#include <gst/check/gstharness.h>
 #include <gst/base/gstaggregator.h>
 
 /* dummy aggregator based element */
@@ -143,10 +144,12 @@ gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout)
   }
   gst_iterator_free (iter);
 
-  if (all_eos == TRUE) {
-    GST_INFO_OBJECT (testagg, "no data available, must be EOS");
-    gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ());
-    return GST_FLOW_EOS;
+  if (!gst_aggregator_get_force_live (aggregator)) {
+    if (all_eos == TRUE) {
+      GST_INFO_OBJECT (testagg, "no data available, must be EOS");
+      gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ());
+      return GST_FLOW_EOS;
+    }
   }
 
   buf = gst_buffer_new ();
@@ -188,6 +191,8 @@ gst_test_aggregator_class_init (GstTestAggregatorClass * klass)
 
   base_aggregator_class->aggregate =
       GST_DEBUG_FUNCPTR (gst_test_aggregator_aggregate);
+
+  base_aggregator_class->get_next_time = gst_aggregator_simple_get_next_time;
 }
 
 static void
@@ -846,8 +851,8 @@ GST_START_TEST (test_flushing_seek)
   GST_BUFFER_TIMESTAMP (buf) = 0;
   _chain_data_init (&data2, test.aggregator, buf, NULL);
 
-  gst_segment_init (&GST_AGGREGATOR_PAD (GST_AGGREGATOR (test.
-              aggregator)->srcpad)->segment, GST_FORMAT_TIME);
+  gst_segment_init (&GST_AGGREGATOR_PAD (GST_AGGREGATOR (test.aggregator)->
+          srcpad)->segment, GST_FORMAT_TIME);
 
   /* now do a successful flushing seek */
   event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
@@ -1353,6 +1358,29 @@ GST_START_TEST (test_remove_pad_on_aggregate)
 
 GST_END_TEST;
 
+GST_START_TEST (test_force_live)
+{
+  GstElement *agg;
+  GstHarness *h;
+  GstBuffer *buf;
+
+  agg = gst_check_setup_element ("testaggregator");
+  g_object_set (agg, "latency", GST_USECOND, NULL);
+  gst_aggregator_set_force_live (GST_AGGREGATOR (agg), TRUE);
+  h = gst_harness_new_with_element (agg, NULL, "src");
+
+  gst_harness_play (h);
+
+  gst_harness_crank_single_clock_wait (h);
+  buf = gst_harness_pull (h);
+
+  gst_buffer_unref (buf);
+  gst_harness_teardown (h);
+  gst_object_unref (agg);
+}
+
+GST_END_TEST;
+
 static Suite *
 gst_aggregator_suite (void)
 {
@@ -1382,6 +1410,7 @@ gst_aggregator_suite (void)
   tcase_add_test (general, test_change_state_intensive);
   tcase_add_test (general, test_flush_on_aggregate);
   tcase_add_test (general, test_remove_pad_on_aggregate);
+  tcase_add_test (general, test_force_live);
 
   return suite;
 }