aggregator: Query latency on first incoming buffer.
authorOlivier Crête <olivier.crete@collabora.com>
Sat, 7 Mar 2015 02:12:52 +0000 (21:12 -0500)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:26 +0000 (15:10 +0000)
And keep on querying upstream until we get a reply.

Also, the _get_latency_unlocked() method required being calld
with a private lock, so removed the _unlocked() variant from the API.
And it now returns GST_CLOCK_TIME_NONE when the element is not live as
we think that 0 upstream latency is possible.

https://bugzilla.gnome.org/show_bug.cgi?id=745768

libs/gst/base/gstaggregator.c
libs/gst/base/gstaggregator.h

index 788c84a..b7e0b17 100644 (file)
@@ -87,6 +87,8 @@ static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
  */
 
 
+static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
+
 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
 #define GST_CAT_DEFAULT aggregator_debug
 
@@ -236,9 +238,10 @@ struct _GstAggregatorPrivate
   GstTagList *tags;
   gboolean tags_changed;
 
-  gboolean latency_live;        /* protected by src_lock */
-  GstClockTime latency_min;     /* protected by src_lock */
-  GstClockTime latency_max;     /* protected by src_lock */
+  gboolean peer_latency_live;   /* protected by src_lock */
+  GstClockTime peer_latency_min;        /* protected by src_lock */
+  GstClockTime peer_latency_max;        /* protected by src_lock */
+  gboolean has_peer_latency;
 
   GstClockTime sub_latency_min; /* protected by src_lock */
   GstClockTime sub_latency_max; /* protected by src_lock */
@@ -535,15 +538,15 @@ gst_aggregator_get_next_time (GstAggregator * self)
 static gboolean
 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
 {
-  GstClockTime latency_max, latency_min;
+  GstClockTime latency;
   GstClockTime start;
-  gboolean live, res;
+  gboolean res;
 
   *timeout = FALSE;
 
   SRC_LOCK (self);
 
-  gst_aggregator_get_latency_unlocked (self, &live, &latency_min, &latency_max);
+  latency = gst_aggregator_get_latency_unlocked (self);
 
   if (gst_aggregator_check_pads_ready (self)) {
     GST_DEBUG_OBJECT (self, "all pads have data");
@@ -561,8 +564,9 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
 
   start = gst_aggregator_get_next_time (self);
 
-  if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self))
-      || !GST_CLOCK_TIME_IS_VALID (start)) {
+  if (!GST_CLOCK_TIME_IS_VALID (latency) ||
+      !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
+      !GST_CLOCK_TIME_IS_VALID (start)) {
     /* We wake up here when something happened, and below
      * then check if we're ready now. If we return FALSE,
      * we will be directly called again.
@@ -585,15 +589,14 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
     GST_OBJECT_UNLOCK (self);
 
     time = base_time + start;
-    time += latency_min;
+    time += latency;
 
     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
-        " latency max %" GST_TIME_FORMAT " latency min %" GST_TIME_FORMAT
-        " current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time),
+        " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
+        GST_TIME_ARGS (time),
         GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
-        GST_TIME_ARGS (start), GST_TIME_ARGS (latency_max),
-        GST_TIME_ARGS (latency_min),
+        GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
         GST_TIME_ARGS (gst_clock_get_time (clock)));
 
 
@@ -987,6 +990,10 @@ gst_aggregator_stop (GstAggregator * agg)
   else
     result = TRUE;
 
+  agg->priv->has_peer_latency = FALSE;
+  agg->priv->peer_latency_live = FALSE;
+  agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE;
+
   if (agg->priv->tags)
     gst_tag_list_unref (agg->priv->tags);
   agg->priv->tags = NULL;
@@ -1110,8 +1117,10 @@ gst_aggregator_request_new_pad (GstElement * element,
   return GST_PAD (agg_pad);
 }
 
+/* Must be called with SRC_LOCK held */
+
 static gboolean
-gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
+gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
 {
   gboolean query_ret, live;
   GstClockTime our_latency, min, max;
@@ -1125,13 +1134,11 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
 
   gst_query_parse_latency (query, &live, &min, &max);
 
-  SRC_LOCK (self);
   our_latency = self->priv->latency;
 
   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
-    SRC_UNLOCK (self);
     return FALSE;
   }
 
@@ -1140,13 +1147,13 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
             GST_TIME_FORMAT ". Add queues or other buffering elements.",
             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
-    SRC_UNLOCK (self);
     return FALSE;
   }
 
-  self->priv->latency_live = live;
-  self->priv->latency_min = min;
-  self->priv->latency_max = max;
+  self->priv->peer_latency_live = live;
+  self->priv->peer_latency_min = min;
+  self->priv->peer_latency_max = max;
+  self->priv->has_peer_latency = TRUE;
 
   /* add our own */
   min += our_latency;
@@ -1170,7 +1177,6 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
   }
 
   SRC_BROADCAST (self);
-  SRC_UNLOCK (self);
 
   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
@@ -1180,47 +1186,63 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
   return query_ret;
 }
 
+/*
+ * MUST be called with the src_lock held.
+ *
+ * See  gst_aggregator_get_latency() for doc
+ */
+static GstClockTime
+gst_aggregator_get_latency_unlocked (GstAggregator * self)
+{
+  GstClockTime latency;
+
+  g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
+
+  if (!self->priv->has_peer_latency) {
+    GstQuery *query = gst_query_new_latency ();
+    gboolean ret;
+
+    ret = gst_aggregator_query_latency_unlocked (self, query);
+    gst_query_unref (query);
+    if (!ret)
+      return GST_CLOCK_TIME_NONE;
+  }
+
+  if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
+    return GST_CLOCK_TIME_NONE;
+
+  /* latency_min is never GST_CLOCK_TIME_NONE by construction */
+  latency = self->priv->peer_latency_min;
+
+  /* add our own */
+  latency += self->priv->latency;
+  latency += self->priv->sub_latency_min;
+
+  return latency;
+}
+
 /**
- * gst_aggregator_get_latency_unlocked:
+ * gst_aggregator_get_latency:
  * @self: a #GstAggregator
- * @live: (out) (allow-none): whether @self is live
- * @min_latency: (out) (allow-none): the configured minimum latency of @self
- * @max_latency: (out) (allow-none): the configured maximum latency of @self
  *
- * Retreives the latency values reported by @self in response to the latency
- * query.
+ * Retrieves the latency values reported by @self in response to the latency
+ * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
+ * will not wait for the clock.
  *
  * Typically only called by subclasses.
  *
- * MUST be called with the src_lock held.
+ * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
  */
-void
-gst_aggregator_get_latency_unlocked (GstAggregator * self, gboolean * live,
-    GstClockTime * min_latency, GstClockTime * max_latency)
+GstClockTime
+gst_aggregator_get_latency (GstAggregator * self)
 {
-  GstClockTime min, max;
-
-  g_return_if_fail (GST_IS_AGGREGATOR (self));
-
-  /* latency_min is never GST_CLOCK_TIME_NONE by construction */
-  min = self->priv->latency_min;
-  max = self->priv->latency_max;
+  GstClockTime ret;
 
-  /* add our own */
-  min += self->priv->latency;
-  min += self->priv->sub_latency_min;
-  if (GST_CLOCK_TIME_IS_VALID (max)
-      && GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max))
-    max += self->priv->sub_latency_max;
-  else
-    max = GST_CLOCK_TIME_NONE;
+  SRC_LOCK (self);
+  ret = gst_aggregator_get_latency_unlocked (self);
+  SRC_UNLOCK (self);
 
-  if (live)
-    *live = self->priv->latency_live;
-  if (min_latency)
-    *min_latency = min;
-  if (max_latency)
-    *max_latency = max;
+  return ret;
 }
 
 static gboolean
@@ -1271,17 +1293,17 @@ gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
       gst_query_set_seeking (query, format, FALSE, 0, -1);
       res = TRUE;
 
-      goto discard;
+      break;
     }
     case GST_QUERY_LATENCY:
-      return gst_aggregator_query_latency (self, query);
-    default:
+      SRC_LOCK (self);
+      res = gst_aggregator_query_latency_unlocked (self, query);
+      SRC_UNLOCK (self);
       break;
+    default:
+      return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
   }
 
-  return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
-
-discard:
   return res;
 }
 
@@ -1549,9 +1571,9 @@ gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
 
   SRC_LOCK (self);
-  if (self->priv->latency_live) {
-    min = self->priv->latency_min;
-    max = self->priv->latency_max;
+  if (self->priv->peer_latency_live) {
+    min = self->priv->peer_latency_min;
+    max = self->priv->peer_latency_max;
     /* add our own */
     min += latency;
     min += self->priv->sub_latency_min;
@@ -1707,9 +1729,10 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
   priv->padcount = -1;
   priv->tags_changed = FALSE;
 
-  self->priv->latency_live = FALSE;
-  self->priv->latency_min = self->priv->sub_latency_min = 0;
-  self->priv->latency_max = self->priv->sub_latency_max = 0;
+  self->priv->peer_latency_live = FALSE;
+  self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
+  self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
+  self->priv->has_peer_latency = FALSE;
   gst_aggregator_reset_flow_values (self);
 
   self->srcpad = gst_pad_new_from_template (pad_template, "src");
index a4ebdd7..dce567b 100644 (file)
@@ -271,10 +271,7 @@ gboolean gst_aggregator_iterate_sinkpads           (GstAggregator
                                                     GstAggregatorPadForeachFunc      func,
                                                     gpointer                         user_data);
 
-void     gst_aggregator_get_latency_unlocked       (GstAggregator                 *  self,
-                                                    gboolean                      *  live,
-                                                    GstClockTime                  *  min,
-                                                    GstClockTime                  *  max);
+GstClockTime  gst_aggregator_get_latency           (GstAggregator                 *  self);
 
 G_END_DECLS