/* Lock to prevent two src setcaps from happening at the same time */
GMutex setcaps_lock;
+
+ gboolean latency_live;
+ GstClockTime latency_min;
+ GstClockTime latency_max;
};
typedef struct
return GST_PAD (agg_pad);
}
+typedef struct
+{
+ GstClockTime min, max;
+ gboolean live;
+} LatencyData;
+
+static gboolean
+_latency_query (GstAggregator * self, GstPad * pad, gpointer user_data)
+{
+ LatencyData *data = user_data;
+ GstClockTime min, max;
+ GstQuery *query;
+ gboolean live, res;
+
+ query = gst_query_new_latency ();
+ res = gst_pad_peer_query (pad, query);
+
+ if (res) {
+ gst_query_parse_latency (query, &live, &min, &max);
+
+ GST_LOG_OBJECT (self, "got latency live:%s min:%" G_GINT64_FORMAT
+ " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
+
+ if (min > data->min)
+ data->min = min;
+
+ if (max != GST_CLOCK_TIME_NONE &&
+ ((data->max != GST_CLOCK_TIME_NONE && max < data->max) ||
+ (data->max == GST_CLOCK_TIME_NONE)))
+ data->max = max;
+
+ data->live |= live;
+ }
+
+ gst_query_unref (query);
+
+ return TRUE;
+}
+
+static gboolean
+gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
+{
+ LatencyData data;
+
+ data.min = 0;
+ data.max = GST_CLOCK_TIME_NONE;
+ data.live = FALSE;
+
+ /* query upstream's latency */
+ gst_aggregator_iterate_sinkpads (self,
+ (GstAggregatorPadForeachFunc) _latency_query, &data);
+
+ if (data.live && GST_CLOCK_TIME_IS_VALID (self->timeout) &&
+ self->timeout > data.max) {
+ GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
+ ("%s", "Timeout too big"),
+ ("The requested timeout value is too big for the latency in the "
+ "current pipeline. Limiting to %" G_GINT64_FORMAT, data.max));
+ self->timeout = data.max;
+ }
+
+ self->priv->latency_live = data.live;
+ self->priv->latency_min = data.min;
+ self->priv->latency_max = data.max;
+
+ /* add our own */
+ if (GST_CLOCK_TIME_IS_VALID (self->timeout)) {
+ if (GST_CLOCK_TIME_IS_VALID (data.min))
+ data.min += self->timeout;
+ if (GST_CLOCK_TIME_IS_VALID (data.max))
+ data.max += self->timeout;
+ }
+
+ GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
+ " max:%" G_GINT64_FORMAT, data.live ? "true" : "false", data.min,
+ data.max);
+
+ gst_query_set_latency (query, data.live, data.min, data.max);
+
+ return TRUE;
+}
+
static gboolean
_send_event (GstElement * element, GstEvent * event)
{
goto discard;
}
+ case GST_QUERY_LATENCY:
+ {
+ return gst_aggregator_query_latency (self, query);
+ }
default:
break;
}
* as unresponsive.
*/
static void
-gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout)
+gst_aggregator_set_timeout (GstAggregator * self, gint64 timeout)
{
- g_return_if_fail (GST_IS_AGGREGATOR (agg));
+ g_return_if_fail (GST_IS_AGGREGATOR (self));
- GST_OBJECT_LOCK (agg);
- agg->timeout = timeout;
- GST_OBJECT_UNLOCK (agg);
+ GST_OBJECT_LOCK (self);
+
+ if (self->priv->latency_live && self->priv->latency_max != 0 &&
+ GST_CLOCK_TIME_IS_VALID (timeout) && timeout > self->priv->latency_max) {
+ GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
+ ("%s", "Timeout too big"),
+ ("The requested timeout value is too big for the latency in the "
+ "current pipeline. Limiting to %" G_GINT64_FORMAT,
+ self->priv->latency_max));
+ timeout = self->priv->latency_max;
+ }
+
+ self->timeout = timeout;
+ GST_OBJECT_UNLOCK (self);
}
/**
priv->padcount = -1;
priv->tags_changed = FALSE;
+
+ self->priv->latency_live = FALSE;
+ self->priv->latency_min = 0;
+ self->priv->latency_max = GST_CLOCK_TIME_NONE;
_reset_flow_values (self);
AGGREGATOR_QUEUE (self) = g_async_queue_new ();