From f829ed33135cd32a960b4041d30157bd3746d8e0 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Mon, 18 Oct 2021 15:56:31 +0200 Subject: [PATCH] aggregator: expose API for ignoring inactive pads An inactive pad is a pad which, in live mode, hasn't yet received a first buffer, but has been waited on at least once. Exposing API to support this behaviour allows users of aggregator subclasses to request pads, and not start pushing data on those immediately, while avoiding systematic timeouts. Subclasses must check in explicitly to this behavior, most likely by exposing a user-facing property, and must check whether a pad needs ignoring when aggregating. That is because by design, aggregator subclasses don't get a list of "ready" pads, but instead directly iterate element->sinkpads. Part-of: --- .../gstreamer/libs/gst/base/gstaggregator.c | 107 ++++++++++++++++++++- .../gstreamer/libs/gst/base/gstaggregator.h | 10 ++ 2 files changed, 116 insertions(+), 1 deletion(-) diff --git a/subprojects/gstreamer/libs/gst/base/gstaggregator.c b/subprojects/gstreamer/libs/gst/base/gstaggregator.c index 5a6e30f..cdf8883 100644 --- a/subprojects/gstreamer/libs/gst/base/gstaggregator.c +++ b/subprojects/gstreamer/libs/gst/base/gstaggregator.c @@ -252,7 +252,10 @@ struct _GstAggregatorPadPrivate guint32 last_flush_start_seqnum; guint32 last_flush_stop_seqnum; + /* Whether the pad hasn't received a first buffer yet */ gboolean first_buffer; + /* Whether we waited once for the pad's first buffer */ + gboolean waited_once; GQueue data; /* buffers, events and queries */ GstBuffer *clipped_buffer; @@ -299,6 +302,7 @@ gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad) aggpad->priv->tail_time = GST_CLOCK_TIME_NONE; aggpad->priv->time_level = 0; aggpad->priv->first_buffer = TRUE; + aggpad->priv->waited_once = FALSE; } static gboolean @@ -400,6 +404,7 @@ struct _GstAggregatorPrivate /* properties */ gint64 latency; /* protected by both src_lock and all pad locks */ gboolean emit_signals; + gboolean ignore_inactive_pads; }; /* Seek event forwarding helper */ @@ -464,6 +469,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self, GList *l, *sinkpads; gboolean have_buffer = TRUE; gboolean have_event_or_query = FALSE; + guint n_ready = 0; GST_LOG_OBJECT (self, "checking pads"); @@ -490,6 +496,12 @@ gst_aggregator_check_pads_ready (GstAggregator * self, break; } + if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live && + pad->priv->waited_once && pad->priv->first_buffer && !pad->priv->eos) { + PAD_UNLOCK (pad); + continue; + } + /* Otherwise check if we have a clipped buffer or a buffer at the top of * the queue, and if not then this pad is not ready unless it is also EOS */ if (!pad->priv->clipped_buffer @@ -502,17 +514,24 @@ gst_aggregator_check_pads_ready (GstAggregator * self, * There's no point in waiting for buffers on EOS pads */ if (!pad->priv->eos) have_buffer = FALSE; + else + n_ready++; } else if (self->priv->peer_latency_live) { /* In live mode, having a single pad with buffers is enough to * generate a start time from it. In non-live mode all pads need * to have a buffer */ self->priv->first_buffer = FALSE; + n_ready++; } PAD_UNLOCK (pad); } + if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live + && n_ready == 0) + goto no_sinkpads; + if (have_event_or_query) goto pad_not_ready_but_event_or_query; @@ -863,6 +882,18 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) /* we timed out */ if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) { + GList *l; + + GST_OBJECT_LOCK (self); + for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) { + GstAggregatorPad *pad = GST_AGGREGATOR_PAD (l->data); + + PAD_LOCK (pad); + pad->priv->waited_once = TRUE; + PAD_UNLOCK (pad); + } + GST_OBJECT_UNLOCK (self); + SRC_UNLOCK (self); *timeout = TRUE; return TRUE; @@ -1352,7 +1383,7 @@ gst_aggregator_aggregate_func (GstAggregator * self) } if (timeout || flow_return >= GST_FLOW_OK) { - GST_TRACE_OBJECT (self, "Actually aggregating!"); + GST_LOG_OBJECT (self, "Actually aggregating, timeout: %d", timeout); flow_return = klass->aggregate (self, timeout); } @@ -2846,6 +2877,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) priv->max_padserial = -1; priv->tags_changed = FALSE; + priv->ignore_inactive_pads = FALSE; self->priv->peer_latency_live = FALSE; self->priv->peer_latency_min = self->priv->sub_latency_min = 0; @@ -3533,6 +3565,36 @@ gst_aggregator_pad_is_eos (GstAggregatorPad * pad) return is_eos; } +/** + * gst_aggregator_pad_is_inactive: + * @pad: an aggregator pad + * + * It is only valid to call this method from #GstAggregatorClass::aggregate() + * + * Returns: %TRUE if the pad is inactive, %FALSE otherwise. + * See gst_aggregator_ignore_inactive_pads() for more info. + * Since: 1.20 + */ +gboolean +gst_aggregator_pad_is_inactive (GstAggregatorPad * pad) +{ + GstAggregator *self; + gboolean inactive; + + self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad))); + + g_assert_nonnull (self); + + PAD_LOCK (pad); + inactive = self->priv->ignore_inactive_pads && self->priv->peer_latency_live + && pad->priv->first_buffer; + PAD_UNLOCK (pad); + + gst_object_unref (self); + + return inactive; +} + #if 0 /* * gst_aggregator_merge_tags: @@ -3762,3 +3824,46 @@ gst_aggregator_selected_samples (GstAggregator * self, self->priv->selected_samples_called_or_warned = TRUE; } + +/** + * gst_aggregator_set_ignore_inactive_pads: + * @ignore: whether inactive pads should not be waited on + * + * Subclasses should call this when they don't want to time out + * waiting for a pad that hasn't yet received any buffers in live + * mode. + * + * #GstAggregator will still wait once on each newly-added pad, making + * sure upstream has had a fair chance to start up. + * + * Since: 1.20 + */ +void +gst_aggregator_set_ignore_inactive_pads (GstAggregator * self, gboolean ignore) +{ + g_return_if_fail (GST_IS_AGGREGATOR (self)); + + GST_OBJECT_LOCK (self); + self->priv->ignore_inactive_pads = ignore; + GST_OBJECT_UNLOCK (self); +} + +/** + * gst_aggregator_get_ignore_inactive_pads: + * + * Returns: whether inactive pads will not be waited on + * Since: 1.20 + */ +gboolean +gst_aggregator_get_ignore_inactive_pads (GstAggregator * self) +{ + gboolean ret; + + g_return_val_if_fail (GST_IS_AGGREGATOR (self), FALSE); + + GST_OBJECT_LOCK (self); + ret = self->priv->ignore_inactive_pads; + GST_OBJECT_UNLOCK (self); + + return ret; +} diff --git a/subprojects/gstreamer/libs/gst/base/gstaggregator.h b/subprojects/gstreamer/libs/gst/base/gstaggregator.h index 86fc70f..45ced12 100644 --- a/subprojects/gstreamer/libs/gst/base/gstaggregator.h +++ b/subprojects/gstreamer/libs/gst/base/gstaggregator.h @@ -121,6 +121,9 @@ gboolean gst_aggregator_pad_has_buffer (GstAggregatorPad * pad); GST_BASE_API gboolean gst_aggregator_pad_is_eos (GstAggregatorPad * pad); +GST_BASE_API +gboolean gst_aggregator_pad_is_inactive (GstAggregatorPad * pad); + /********************* * GstAggregator API * ********************/ @@ -424,6 +427,13 @@ void gst_aggregator_selected_samples (GstAggregator GstClockTime duration, GstStructure * info); +GST_BASE_API +void gst_aggregator_set_ignore_inactive_pads (GstAggregator * self, + gboolean ignore); + +GST_BASE_API +gboolean gst_aggregator_get_ignore_inactive_pads (GstAggregator * self); + /** * GstAggregatorStartTimeSelection: * @GST_AGGREGATOR_START_TIME_SELECTION_ZERO: Start at running time 0. -- 2.7.4