aggregator: expose API for ignoring inactive pads
authorMathieu Duponchelle <mathieu@centricular.com>
Mon, 18 Oct 2021 13:56:31 +0000 (15:56 +0200)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Mon, 18 Oct 2021 22:34:11 +0000 (22:34 +0000)
An inactive pad is a pad which, in live mode, hasn't yet received
a first buffer, but has been waited on at least once.

Exposing API to support this behaviour allows users of aggregator
subclasses to request pads, and not start pushing data on those
immediately, while avoiding systematic timeouts.

Subclasses must check in explicitly to this behavior, most likely
by exposing a user-facing property, and must check whether a pad
needs ignoring when aggregating. That is because by design,
aggregator subclasses don't get a list of "ready" pads, but instead
directly iterate element->sinkpads.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/867>

subprojects/gstreamer/libs/gst/base/gstaggregator.c
subprojects/gstreamer/libs/gst/base/gstaggregator.h

index 5a6e30f..cdf8883 100644 (file)
@@ -252,7 +252,10 @@ struct _GstAggregatorPadPrivate
   guint32 last_flush_start_seqnum;
   guint32 last_flush_stop_seqnum;
 
+  /* Whether the pad hasn't received a first buffer yet */
   gboolean first_buffer;
+  /* Whether we waited once for the pad's first buffer */
+  gboolean waited_once;
 
   GQueue data;                  /* buffers, events and queries */
   GstBuffer *clipped_buffer;
@@ -299,6 +302,7 @@ gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
   aggpad->priv->time_level = 0;
   aggpad->priv->first_buffer = TRUE;
+  aggpad->priv->waited_once = FALSE;
 }
 
 static gboolean
@@ -400,6 +404,7 @@ struct _GstAggregatorPrivate
   /* properties */
   gint64 latency;               /* protected by both src_lock and all pad locks */
   gboolean emit_signals;
+  gboolean ignore_inactive_pads;
 };
 
 /* Seek event forwarding helper */
@@ -464,6 +469,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
   GList *l, *sinkpads;
   gboolean have_buffer = TRUE;
   gboolean have_event_or_query = FALSE;
+  guint n_ready = 0;
 
   GST_LOG_OBJECT (self, "checking pads");
 
@@ -490,6 +496,12 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
       break;
     }
 
+    if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live &&
+        pad->priv->waited_once && pad->priv->first_buffer && !pad->priv->eos) {
+      PAD_UNLOCK (pad);
+      continue;
+    }
+
     /* Otherwise check if we have a clipped buffer or a buffer at the top of
      * the queue, and if not then this pad is not ready unless it is also EOS */
     if (!pad->priv->clipped_buffer
@@ -502,17 +514,24 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
        * There's no point in waiting for buffers on EOS pads */
       if (!pad->priv->eos)
         have_buffer = FALSE;
+      else
+        n_ready++;
     } else if (self->priv->peer_latency_live) {
       /* In live mode, having a single pad with buffers is enough to
        * generate a start time from it. In non-live mode all pads need
        * to have a buffer
        */
       self->priv->first_buffer = FALSE;
+      n_ready++;
     }
 
     PAD_UNLOCK (pad);
   }
 
+  if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live
+      && n_ready == 0)
+    goto no_sinkpads;
+
   if (have_event_or_query)
     goto pad_not_ready_but_event_or_query;
 
@@ -863,6 +882,18 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
 
     /* we timed out */
     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
+      GList *l;
+
+      GST_OBJECT_LOCK (self);
+      for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
+        GstAggregatorPad *pad = GST_AGGREGATOR_PAD (l->data);
+
+        PAD_LOCK (pad);
+        pad->priv->waited_once = TRUE;
+        PAD_UNLOCK (pad);
+      }
+      GST_OBJECT_UNLOCK (self);
+
       SRC_UNLOCK (self);
       *timeout = TRUE;
       return TRUE;
@@ -1352,7 +1383,7 @@ gst_aggregator_aggregate_func (GstAggregator * self)
     }
 
     if (timeout || flow_return >= GST_FLOW_OK) {
-      GST_TRACE_OBJECT (self, "Actually aggregating!");
+      GST_LOG_OBJECT (self, "Actually aggregating, timeout: %d", timeout);
       flow_return = klass->aggregate (self, timeout);
     }
 
@@ -2846,6 +2877,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
 
   priv->max_padserial = -1;
   priv->tags_changed = FALSE;
+  priv->ignore_inactive_pads = FALSE;
 
   self->priv->peer_latency_live = FALSE;
   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
@@ -3533,6 +3565,36 @@ gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
   return is_eos;
 }
 
+/**
+ * gst_aggregator_pad_is_inactive:
+ * @pad: an aggregator pad
+ *
+ * It is only valid to call this method from #GstAggregatorClass::aggregate()
+ *
+ * Returns: %TRUE if the pad is inactive, %FALSE otherwise.
+ *   See gst_aggregator_ignore_inactive_pads() for more info.
+ * Since: 1.20
+ */
+gboolean
+gst_aggregator_pad_is_inactive (GstAggregatorPad * pad)
+{
+  GstAggregator *self;
+  gboolean inactive;
+
+  self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
+
+  g_assert_nonnull (self);
+
+  PAD_LOCK (pad);
+  inactive = self->priv->ignore_inactive_pads && self->priv->peer_latency_live
+      && pad->priv->first_buffer;
+  PAD_UNLOCK (pad);
+
+  gst_object_unref (self);
+
+  return inactive;
+}
+
 #if 0
 /*
  * gst_aggregator_merge_tags:
@@ -3762,3 +3824,46 @@ gst_aggregator_selected_samples (GstAggregator * self,
 
   self->priv->selected_samples_called_or_warned = TRUE;
 }
+
+/**
+ * gst_aggregator_set_ignore_inactive_pads:
+ * @ignore: whether inactive pads should not be waited on
+ *
+ * Subclasses should call this when they don't want to time out
+ * waiting for a pad that hasn't yet received any buffers in live
+ * mode.
+ *
+ * #GstAggregator will still wait once on each newly-added pad, making
+ * sure upstream has had a fair chance to start up.
+ *
+ * Since: 1.20
+ */
+void
+gst_aggregator_set_ignore_inactive_pads (GstAggregator * self, gboolean ignore)
+{
+  g_return_if_fail (GST_IS_AGGREGATOR (self));
+
+  GST_OBJECT_LOCK (self);
+  self->priv->ignore_inactive_pads = ignore;
+  GST_OBJECT_UNLOCK (self);
+}
+
+/**
+ * gst_aggregator_get_ignore_inactive_pads:
+ *
+ * Returns: whether inactive pads will not be waited on
+ * Since: 1.20
+ */
+gboolean
+gst_aggregator_get_ignore_inactive_pads (GstAggregator * self)
+{
+  gboolean ret;
+
+  g_return_val_if_fail (GST_IS_AGGREGATOR (self), FALSE);
+
+  GST_OBJECT_LOCK (self);
+  ret = self->priv->ignore_inactive_pads;
+  GST_OBJECT_UNLOCK (self);
+
+  return ret;
+}
index 86fc70f..45ced12 100644 (file)
@@ -121,6 +121,9 @@ gboolean    gst_aggregator_pad_has_buffer   (GstAggregatorPad * pad);
 GST_BASE_API
 gboolean    gst_aggregator_pad_is_eos       (GstAggregatorPad *  pad);
 
+GST_BASE_API
+gboolean    gst_aggregator_pad_is_inactive  (GstAggregatorPad * pad);
+
 /*********************
  * GstAggregator API *
  ********************/
@@ -424,6 +427,13 @@ void            gst_aggregator_selected_samples     (GstAggregator
                                                      GstClockTime                   duration,
                                                      GstStructure                 * info);
 
+GST_BASE_API
+void            gst_aggregator_set_ignore_inactive_pads (GstAggregator * self,
+                                                         gboolean ignore);
+
+GST_BASE_API
+gboolean        gst_aggregator_get_ignore_inactive_pads (GstAggregator * self);
+
 /**
  * GstAggregatorStartTimeSelection:
  * @GST_AGGREGATOR_START_TIME_SELECTION_ZERO: Start at running time 0.