*
* A #GstNetClientClock is typically set on a #GstPipeline with
* gst_pipeline_use_clock().
+ *
+ * If you set a #GstBus on the clock via the "bus" object property, it will
+ * send @GST_MESSAGE_INFO messages with an attached #GstStructure containing
+ * statistics about clock accuracy and network traffic.
*/
#ifdef HAVE_CONFIG_H
PROP_0,
PROP_ADDRESS,
PROP_PORT,
- PROP_ROUNDTRIP_LIMIT
+ PROP_ROUNDTRIP_LIMIT,
+ PROP_BUS
};
#define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj) \
gchar *address;
gint port;
+
+ GstBus *bus;
};
#define _do_init \
g_param_spec_int ("port", "port",
"The port on which the remote server is listening", 0, G_MAXUINT16,
DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_BUS,
+ g_param_spec_object ("bus", "bus",
+ "A GstBus on which to send clock status information", GST_TYPE_BUS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstNetClientClock::round-trip-limit:
self->priv->socket = NULL;
}
+ if (self->priv->bus != NULL) {
+ gst_object_unref (self->priv->bus);
+ self->priv->bus = NULL;
+ }
+
G_OBJECT_CLASS (parent_class)->finalize (object);
}
switch (prop_id) {
case PROP_ADDRESS:
+ GST_OBJECT_LOCK (self);
g_free (self->priv->address);
self->priv->address = g_value_dup_string (value);
if (self->priv->address == NULL)
self->priv->address = g_strdup (DEFAULT_ADDRESS);
+ GST_OBJECT_UNLOCK (self);
break;
case PROP_PORT:
+ GST_OBJECT_LOCK (self);
self->priv->port = g_value_get_int (value);
+ GST_OBJECT_UNLOCK (self);
break;
case PROP_ROUNDTRIP_LIMIT:
+ GST_OBJECT_LOCK (self);
self->priv->roundtrip_limit = g_value_get_uint64 (value);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_BUS:
+ GST_OBJECT_LOCK (self);
+ if (self->priv->bus)
+ gst_object_unref (self->priv->bus);
+ self->priv->bus = g_value_dup_object (value);
+ GST_OBJECT_UNLOCK (self);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
switch (prop_id) {
case PROP_ADDRESS:
+ GST_OBJECT_LOCK (self);
g_value_set_string (value, self->priv->address);
+ GST_OBJECT_UNLOCK (self);
break;
case PROP_PORT:
g_value_set_int (value, self->priv->port);
break;
case PROP_ROUNDTRIP_LIMIT:
+ GST_OBJECT_LOCK (self);
g_value_set_uint64 (value, self->priv->roundtrip_limit);
+ GST_OBJECT_UNLOCK (self);
+ break;
+ case PROP_BUS:
+ GST_OBJECT_LOCK (self);
+ g_value_set_object (value, self->priv->bus);
+ GST_OBJECT_UNLOCK (self);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
GstClockTime local_avg;
gdouble r_squared;
GstClock *clock;
- GstClockTime rtt;
+ GstClockTime rtt, rtt_limit;
+ GstBus *bus = NULL;
+ /* Use for discont tracking */
+ GstClockTime time_before = 0;
+ GstClockTime min_guess = 0;
+ GstClockTimeDiff time_discont;
+ gboolean synched;
+ GstClockTime internal_time, external_time, rate_num, rate_den;
+
+ GST_OBJECT_LOCK (self);
+ rtt_limit = self->priv->roundtrip_limit;
+ if (self->priv->bus)
+ bus = gst_object_ref (self->priv->bus);
+ GST_OBJECT_UNLOCK (self);
if (local_2 < local_1) {
GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT
rtt = GST_CLOCK_DIFF (local_1, local_2);
- if ((self->priv->roundtrip_limit > 0) && (rtt > self->priv->roundtrip_limit)) {
+ if ((rtt_limit > 0) && (rtt > rtt_limit)) {
GST_LOG_OBJECT (self,
"Dropping observation: RTT %" GST_TIME_FORMAT " > limit %"
- GST_TIME_FORMAT, GST_TIME_ARGS (rtt),
- GST_TIME_ARGS (self->priv->roundtrip_limit));
+ GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (rtt_limit));
goto bogus_observation;
}
clock = GST_CLOCK_CAST (self);
- if (gst_clock_add_observation (GST_CLOCK (self), local_avg, remote,
- &r_squared)) {
+ /* Store what the clock produced as 'now' before this update */
+ gst_clock_get_calibration (GST_CLOCK (self), &internal_time, &external_time,
+ &rate_num, &rate_den);
+
+ min_guess =
+ gst_clock_adjust_with_calibration (GST_CLOCK (self), local_1,
+ internal_time, external_time, rate_num, rate_den);
+ time_before =
+ gst_clock_adjust_with_calibration (GST_CLOCK (self), local_2,
+ internal_time, external_time, rate_num, rate_den);
+
+ /* If the remote observation was within our min/max estimates, we're synched */
+ synched = (GST_CLOCK_DIFF (remote, min_guess) < 0
+ && GST_CLOCK_DIFF (remote, time_before) > 0);
+
+ if (gst_clock_add_observation_unapplied (GST_CLOCK (self), local_avg, remote,
+ &r_squared, &internal_time, &external_time, &rate_num, &rate_den)) {
/* ghetto formula - shorter timeout for bad correlations */
current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
current_timeout = MIN (current_timeout, gst_clock_get_timeout (clock));
current_timeout = 0;
}
+ /* Now compare the difference (discont) in the clock
+ * after this observation */
+ time_discont = GST_CLOCK_DIFF (time_before,
+ gst_clock_adjust_with_calibration (GST_CLOCK (self), local_2,
+ internal_time, external_time, rate_num, rate_den));
+
+ /* If we were in sync with the remote clock, clamp the allowed
+ * discontinuity to within quarter of one RTT. In sync means our send/receive estimates
+ * of remote time correctly windowed the actual remote time observation */
+ if (synched && ABS (time_discont) > priv->rtt_avg / 4) {
+ GstClockTimeDiff offset;
+ GstClockTime max_discont = priv->rtt_avg / 4;
+ GST_LOG_OBJECT (clock,
+ "Too large a discont, clamping to 1/2 average RTT = %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (max_discont));
+ if (time_discont > 0) { /* Too large a forward step - add a -ve offset */
+ offset = max_discont - time_discont;
+ if (-offset > external_time)
+ external_time = 0;
+ else
+ external_time += offset;
+ } else { /* Too large a backward step - add a +ve offset */
+ offset = -(max_discont + time_discont);
+ external_time += offset;
+ }
+
+ time_discont += offset;
+ }
+
+ gst_clock_set_calibration (GST_CLOCK (self), internal_time, external_time,
+ rate_num, rate_den);
+
+ if (bus) {
+ GstStructure *s;
+ GstMessage *msg;
+
+
+ s = gst_structure_new ("gst-netclock-statistics",
+ "synchronised", G_TYPE_BOOLEAN, synched,
+ "rtt", G_TYPE_UINT64, rtt,
+ "rtt-average", G_TYPE_UINT64, priv->rtt_avg,
+ "local", G_TYPE_UINT64, local_avg,
+ "remote", G_TYPE_UINT64, remote,
+ "discontinuity", G_TYPE_INT64, time_discont,
+ "remote-min-estimate", G_TYPE_UINT64, min_guess,
+ "remote-max-estimate", G_TYPE_UINT64, time_before,
+ "remote-min-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote, min_guess),
+ "remote-max-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote, time_before),
+ "request-send", G_TYPE_UINT64, local_1,
+ "request-receive", G_TYPE_UINT64, local_2,
+ "r-squared", G_TYPE_DOUBLE, r_squared,
+ "timeout", G_TYPE_UINT64, current_timeout,
+ "internal-time", G_TYPE_UINT64, internal_time,
+ "external-time", G_TYPE_UINT64, external_time,
+ "rate-num", G_TYPE_UINT64, rate_num,
+ "rate-den", G_TYPE_UINT64, rate_den,
+ "local-clock-offset", G_TYPE_INT64, GST_CLOCK_DIFF (internal_time,
+ external_time), NULL);
+ msg = gst_message_new_element (GST_OBJECT (self), s);
+ gst_bus_post (bus, msg);
+ }
+
GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
self->priv->timeout_expiration = gst_util_get_timestamp () + current_timeout;
+ if (bus)
+ gst_object_unref (bus);
return;
bogus_observation:
+ if (bus)
+ gst_object_unref (bus);
/* Schedule a new packet again soon */
self->priv->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4);
return;
}
}
}
-
GST_INFO_OBJECT (self, "shutting down net client clock thread");
return NULL;
}