aggregator: add latency query handling
authorMatthew Waters <matthew@centricular.com>
Mon, 6 Oct 2014 10:46:24 +0000 (21:46 +1100)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 2 Dec 2017 15:10:26 +0000 (15:10 +0000)
libs/gst/base/gstaggregator.c

index 42c0d8e..a4e1e8d 100644 (file)
@@ -226,6 +226,10 @@ struct _GstAggregatorPrivate
 
   /* Lock to prevent two src setcaps from happening at the same time  */
   GMutex setcaps_lock;
+
+  gboolean latency_live;
+  GstClockTime latency_min;
+  GstClockTime latency_max;
 };
 
 typedef struct
@@ -904,6 +908,88 @@ _request_new_pad (GstElement * element,
   return GST_PAD (agg_pad);
 }
 
+typedef struct
+{
+  GstClockTime min, max;
+  gboolean live;
+} LatencyData;
+
+static gboolean
+_latency_query (GstAggregator * self, GstPad * pad, gpointer user_data)
+{
+  LatencyData *data = user_data;
+  GstClockTime min, max;
+  GstQuery *query;
+  gboolean live, res;
+
+  query = gst_query_new_latency ();
+  res = gst_pad_peer_query (pad, query);
+
+  if (res) {
+    gst_query_parse_latency (query, &live, &min, &max);
+
+    GST_LOG_OBJECT (self, "got latency live:%s min:%" G_GINT64_FORMAT
+        " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
+
+    if (min > data->min)
+      data->min = min;
+
+    if (max != GST_CLOCK_TIME_NONE &&
+        ((data->max != GST_CLOCK_TIME_NONE && max < data->max) ||
+            (data->max == GST_CLOCK_TIME_NONE)))
+      data->max = max;
+
+    data->live |= live;
+  }
+
+  gst_query_unref (query);
+
+  return TRUE;
+}
+
+static gboolean
+gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
+{
+  LatencyData data;
+
+  data.min = 0;
+  data.max = GST_CLOCK_TIME_NONE;
+  data.live = FALSE;
+
+  /* query upstream's latency */
+  gst_aggregator_iterate_sinkpads (self,
+      (GstAggregatorPadForeachFunc) _latency_query, &data);
+
+  if (data.live && GST_CLOCK_TIME_IS_VALID (self->timeout) &&
+      self->timeout > data.max) {
+    GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
+        ("%s", "Timeout too big"),
+        ("The requested timeout value is too big for the latency in the "
+            "current pipeline.  Limiting to %" G_GINT64_FORMAT, data.max));
+    self->timeout = data.max;
+  }
+
+  self->priv->latency_live = data.live;
+  self->priv->latency_min = data.min;
+  self->priv->latency_max = data.max;
+
+  /* add our own */
+  if (GST_CLOCK_TIME_IS_VALID (self->timeout)) {
+    if (GST_CLOCK_TIME_IS_VALID (data.min))
+      data.min += self->timeout;
+    if (GST_CLOCK_TIME_IS_VALID (data.max))
+      data.max += self->timeout;
+  }
+
+  GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
+      " max:%" G_GINT64_FORMAT, data.live ? "true" : "false", data.min,
+      data.max);
+
+  gst_query_set_latency (query, data.live, data.min, data.max);
+
+  return TRUE;
+}
+
 static gboolean
 _send_event (GstElement * element, GstEvent * event)
 {
@@ -951,6 +1037,10 @@ _src_query (GstAggregator * self, GstQuery * query)
 
       goto discard;
     }
+    case GST_QUERY_LATENCY:
+    {
+      return gst_aggregator_query_latency (self, query);
+    }
     default:
       break;
   }
@@ -1216,13 +1306,24 @@ gst_aggregator_dispose (GObject * object)
  * as unresponsive.
  */
 static void
-gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout)
+gst_aggregator_set_timeout (GstAggregator * self, gint64 timeout)
 {
-  g_return_if_fail (GST_IS_AGGREGATOR (agg));
+  g_return_if_fail (GST_IS_AGGREGATOR (self));
 
-  GST_OBJECT_LOCK (agg);
-  agg->timeout = timeout;
-  GST_OBJECT_UNLOCK (agg);
+  GST_OBJECT_LOCK (self);
+
+  if (self->priv->latency_live && self->priv->latency_max != 0 &&
+      GST_CLOCK_TIME_IS_VALID (timeout) && timeout > self->priv->latency_max) {
+    GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
+        ("%s", "Timeout too big"),
+        ("The requested timeout value is too big for the latency in the "
+            "current pipeline.  Limiting to %" G_GINT64_FORMAT,
+            self->priv->latency_max));
+    timeout = self->priv->latency_max;
+  }
+
+  self->timeout = timeout;
+  GST_OBJECT_UNLOCK (self);
 }
 
 /**
@@ -1343,6 +1444,10 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
 
   priv->padcount = -1;
   priv->tags_changed = FALSE;
+
+  self->priv->latency_live = FALSE;
+  self->priv->latency_min = 0;
+  self->priv->latency_max = GST_CLOCK_TIME_NONE;
   _reset_flow_values (self);
 
   AGGREGATOR_QUEUE (self) = g_async_queue_new ();