aggregator: Use the sinkpads iterator directly to query upstream latencies
authorSebastian Dröge <sebastian@centricular.com>
Thu, 19 Feb 2015 09:04:28 +0000 (11:04 +0200)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:26 +0000 (15:10 +0000)
While gst_aggregator_iterate_sinkpads() makes sure that every pad is only
visited once, even when the iterator has to resync, this is not all we have
to do for querying the latency. When the iterator resyncs we actually have
to query all pads for the latency again and forget our previous results. It
might have happened that a pad was removed, which influenced the result of
the latency query.

libs/gst/base/gstaggregator.c

index 444ddf4..947bfea 100644 (file)
@@ -1117,13 +1117,13 @@ typedef struct
 {
   GstClockTime min, max;
   gboolean live;
-  gboolean res;
 } LatencyData;
 
 static gboolean
-gst_aggregator_query_sink_latency_foreach (GstAggregator * self,
-    GstAggregatorPad * pad, gpointer user_data)
+query_upstream_latency_fold (const GValue * item, GValue * ret,
+    gpointer user_data)
 {
+  GstPad *pad = g_value_get_object (item);
   LatencyData *data = user_data;
   GstClockTime min, max;
   GstQuery *query;
@@ -1151,7 +1151,7 @@ gst_aggregator_query_sink_latency_foreach (GstAggregator * self,
     }
   } else {
     GST_LOG_OBJECT (pad, "latency query failed");
-    data->res = FALSE;
+    g_value_set_boolean (ret, FALSE);
   }
 
   gst_query_unref (query);
@@ -1162,19 +1162,46 @@ gst_aggregator_query_sink_latency_foreach (GstAggregator * self,
 static gboolean
 gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
 {
-  GstClockTime our_latency;
+  GstIterator *it;
+  GstIteratorResult res;
+  GValue ret = G_VALUE_INIT;
+  gboolean query_ret;
   LatencyData data;
+  GstClockTime our_latency;
+
+  it = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (self));
+  g_value_init (&ret, G_TYPE_BOOLEAN);
 
+retry:
   data.min = 0;
   data.max = GST_CLOCK_TIME_NONE;
   data.live = FALSE;
-  data.res = TRUE;
+  g_value_set_boolean (&ret, TRUE);
 
   /* query upstream's latency */
-  SRC_LOCK (self);
-  gst_aggregator_iterate_sinkpads (self,
-      gst_aggregator_query_sink_latency_foreach, &data);
-  SRC_UNLOCK (self);
+  res = gst_iterator_fold (it, query_upstream_latency_fold, &ret, &data);
+  switch (res) {
+    case GST_ITERATOR_OK:
+      g_assert_not_reached ();
+      break;
+    case GST_ITERATOR_DONE:
+      break;
+    case GST_ITERATOR_ERROR:
+      g_value_set_boolean (&ret, FALSE);
+      break;
+    case GST_ITERATOR_RESYNC:
+      gst_iterator_resync (it);
+      goto retry;
+    default:
+      g_assert_not_reached ();
+      break;
+  }
+  gst_iterator_free (it);
+  query_ret = g_value_get_boolean (&ret);
+  if (!query_ret) {
+    GST_WARNING_OBJECT (self, "Latency query failed");
+    return FALSE;
+  }
 
   GST_OBJECT_LOCK (self);
   our_latency = self->priv->latency;
@@ -1224,7 +1251,7 @@ gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
 
   gst_query_set_latency (query, data.live, data.min, data.max);
 
-  return data.res;
+  return query_ret;
 }
 
 /**
@@ -1742,7 +1769,6 @@ gst_aggregator_class_init (GstAggregatorClass * klass)
           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
-  GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_query_sink_latency_foreach);
 }
 
 static void