netclock: Implement sending statistic bus messages and discont limits
authorJan Schmidt <jan@centricular.com>
Sat, 10 Jan 2015 10:42:00 +0000 (21:42 +1100)
committerJan Schmidt <jan@centricular.com>
Wed, 21 Jan 2015 11:27:18 +0000 (22:27 +1100)
Allow setting a GstBus on the network clock client
via a new 'bus' object property. If a bus is set, the
clock will output an element message containing statistics
about new clock observations and the clock correlation.

When the local clock is synchronised with the remote, limit the
maximum jump in the clock at any point to be one average RTT to
the server. Also, publish in the bus message whether we are
synched with the remote or not.

libs/gst/net/gstnetclientclock.c

index dd1dafe..d125e89 100644 (file)
  *
  * A #GstNetClientClock is typically set on a #GstPipeline with 
  * gst_pipeline_use_clock().
+ *
+ * If you set a #GstBus on the clock via the "bus" object property, it will
+ * send @GST_MESSAGE_INFO messages with an attached #GstStructure containing
+ * statistics about clock accuracy and network traffic.
  */
 
 #ifdef HAVE_CONFIG_H
@@ -69,7 +73,8 @@ enum
   PROP_0,
   PROP_ADDRESS,
   PROP_PORT,
-  PROP_ROUNDTRIP_LIMIT
+  PROP_ROUNDTRIP_LIMIT,
+  PROP_BUS
 };
 
 #define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj)  \
@@ -89,6 +94,8 @@ struct _GstNetClientClockPrivate
 
   gchar *address;
   gint port;
+
+  GstBus *bus;
 };
 
 #define _do_init \
@@ -126,6 +133,10 @@ gst_net_client_clock_class_init (GstNetClientClockClass * klass)
       g_param_spec_int ("port", "port",
           "The port on which the remote server is listening", 0, G_MAXUINT16,
           DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_BUS,
+      g_param_spec_object ("bus", "bus",
+          "A GstBus on which to send clock status information", GST_TYPE_BUS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /**
    * GstNetClientClock::round-trip-limit:
@@ -191,6 +202,11 @@ gst_net_client_clock_finalize (GObject * object)
     self->priv->socket = NULL;
   }
 
+  if (self->priv->bus != NULL) {
+    gst_object_unref (self->priv->bus);
+    self->priv->bus = NULL;
+  }
+
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
@@ -202,16 +218,29 @@ gst_net_client_clock_set_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_ADDRESS:
+      GST_OBJECT_LOCK (self);
       g_free (self->priv->address);
       self->priv->address = g_value_dup_string (value);
       if (self->priv->address == NULL)
         self->priv->address = g_strdup (DEFAULT_ADDRESS);
+      GST_OBJECT_UNLOCK (self);
       break;
     case PROP_PORT:
+      GST_OBJECT_LOCK (self);
       self->priv->port = g_value_get_int (value);
+      GST_OBJECT_UNLOCK (self);
       break;
     case PROP_ROUNDTRIP_LIMIT:
+      GST_OBJECT_LOCK (self);
       self->priv->roundtrip_limit = g_value_get_uint64 (value);
+      GST_OBJECT_UNLOCK (self);
+      break;
+    case PROP_BUS:
+      GST_OBJECT_LOCK (self);
+      if (self->priv->bus)
+        gst_object_unref (self->priv->bus);
+      self->priv->bus = g_value_dup_object (value);
+      GST_OBJECT_UNLOCK (self);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -227,13 +256,22 @@ gst_net_client_clock_get_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_ADDRESS:
+      GST_OBJECT_LOCK (self);
       g_value_set_string (value, self->priv->address);
+      GST_OBJECT_UNLOCK (self);
       break;
     case PROP_PORT:
       g_value_set_int (value, self->priv->port);
       break;
     case PROP_ROUNDTRIP_LIMIT:
+      GST_OBJECT_LOCK (self);
       g_value_set_uint64 (value, self->priv->roundtrip_limit);
+      GST_OBJECT_UNLOCK (self);
+      break;
+    case PROP_BUS:
+      GST_OBJECT_LOCK (self);
+      g_value_set_object (value, self->priv->bus);
+      GST_OBJECT_UNLOCK (self);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -250,7 +288,20 @@ gst_net_client_clock_observe_times (GstNetClientClock * self,
   GstClockTime local_avg;
   gdouble r_squared;
   GstClock *clock;
-  GstClockTime rtt;
+  GstClockTime rtt, rtt_limit;
+  GstBus *bus = NULL;
+  /* Use for discont tracking */
+  GstClockTime time_before = 0;
+  GstClockTime min_guess = 0;
+  GstClockTimeDiff time_discont;
+  gboolean synched;
+  GstClockTime internal_time, external_time, rate_num, rate_den;
+
+  GST_OBJECT_LOCK (self);
+  rtt_limit = self->priv->roundtrip_limit;
+  if (self->priv->bus)
+    bus = gst_object_ref (self->priv->bus);
+  GST_OBJECT_UNLOCK (self);
 
   if (local_2 < local_1) {
     GST_LOG_OBJECT (self, "Dropping observation: receive time %" GST_TIME_FORMAT
@@ -261,11 +312,10 @@ gst_net_client_clock_observe_times (GstNetClientClock * self,
 
   rtt = GST_CLOCK_DIFF (local_1, local_2);
 
-  if ((self->priv->roundtrip_limit > 0) && (rtt > self->priv->roundtrip_limit)) {
+  if ((rtt_limit > 0) && (rtt > rtt_limit)) {
     GST_LOG_OBJECT (self,
         "Dropping observation: RTT %" GST_TIME_FORMAT " > limit %"
-        GST_TIME_FORMAT, GST_TIME_ARGS (rtt),
-        GST_TIME_ARGS (self->priv->roundtrip_limit));
+        GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (rtt_limit));
     goto bogus_observation;
   }
 
@@ -294,8 +344,23 @@ gst_net_client_clock_observe_times (GstNetClientClock * self,
 
   clock = GST_CLOCK_CAST (self);
 
-  if (gst_clock_add_observation (GST_CLOCK (self), local_avg, remote,
-          &r_squared)) {
+  /* Store what the clock produced as 'now' before this update */
+  gst_clock_get_calibration (GST_CLOCK (self), &internal_time, &external_time,
+      &rate_num, &rate_den);
+
+  min_guess =
+      gst_clock_adjust_with_calibration (GST_CLOCK (self), local_1,
+      internal_time, external_time, rate_num, rate_den);
+  time_before =
+      gst_clock_adjust_with_calibration (GST_CLOCK (self), local_2,
+      internal_time, external_time, rate_num, rate_den);
+
+  /* If the remote observation was within our min/max estimates, we're synched */
+  synched = (GST_CLOCK_DIFF (remote, min_guess) < 0
+      && GST_CLOCK_DIFF (remote, time_before) > 0);
+
+  if (gst_clock_add_observation_unapplied (GST_CLOCK (self), local_avg, remote,
+          &r_squared, &internal_time, &external_time, &rate_num, &rate_den)) {
     /* ghetto formula - shorter timeout for bad correlations */
     current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
     current_timeout = MIN (current_timeout, gst_clock_get_timeout (clock));
@@ -303,12 +368,78 @@ gst_net_client_clock_observe_times (GstNetClientClock * self,
     current_timeout = 0;
   }
 
+  /* Now compare the difference (discont) in the clock
+   * after this observation */
+  time_discont = GST_CLOCK_DIFF (time_before,
+      gst_clock_adjust_with_calibration (GST_CLOCK (self), local_2,
+          internal_time, external_time, rate_num, rate_den));
+
+  /* If we were in sync with the remote clock, clamp the allowed
+   * discontinuity to within quarter of one RTT. In sync means our send/receive estimates
+   * of remote time correctly windowed the actual remote time observation */
+  if (synched && ABS (time_discont) > priv->rtt_avg / 4) {
+    GstClockTimeDiff offset;
+    GstClockTime max_discont = priv->rtt_avg / 4;
+    GST_LOG_OBJECT (clock,
+        "Too large a discont, clamping to 1/2 average RTT = %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (max_discont));
+    if (time_discont > 0) {     /* Too large a forward step - add a -ve offset */
+      offset = max_discont - time_discont;
+      if (-offset > external_time)
+        external_time = 0;
+      else
+        external_time += offset;
+    } else {                    /* Too large a backward step - add a +ve offset */
+      offset = -(max_discont + time_discont);
+      external_time += offset;
+    }
+
+    time_discont += offset;
+  }
+
+  gst_clock_set_calibration (GST_CLOCK (self), internal_time, external_time,
+      rate_num, rate_den);
+
+  if (bus) {
+    GstStructure *s;
+    GstMessage *msg;
+
+
+    s = gst_structure_new ("gst-netclock-statistics",
+        "synchronised", G_TYPE_BOOLEAN, synched,
+        "rtt", G_TYPE_UINT64, rtt,
+        "rtt-average", G_TYPE_UINT64, priv->rtt_avg,
+        "local", G_TYPE_UINT64, local_avg,
+        "remote", G_TYPE_UINT64, remote,
+        "discontinuity", G_TYPE_INT64, time_discont,
+        "remote-min-estimate", G_TYPE_UINT64, min_guess,
+        "remote-max-estimate", G_TYPE_UINT64, time_before,
+        "remote-min-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote, min_guess),
+        "remote-max-error", G_TYPE_INT64, GST_CLOCK_DIFF (remote, time_before),
+        "request-send", G_TYPE_UINT64, local_1,
+        "request-receive", G_TYPE_UINT64, local_2,
+        "r-squared", G_TYPE_DOUBLE, r_squared,
+        "timeout", G_TYPE_UINT64, current_timeout,
+        "internal-time", G_TYPE_UINT64, internal_time,
+        "external-time", G_TYPE_UINT64, external_time,
+        "rate-num", G_TYPE_UINT64, rate_num,
+        "rate-den", G_TYPE_UINT64, rate_den,
+        "local-clock-offset", G_TYPE_INT64, GST_CLOCK_DIFF (internal_time,
+            external_time), NULL);
+    msg = gst_message_new_element (GST_OBJECT (self), s);
+    gst_bus_post (bus, msg);
+  }
+
   GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
   self->priv->timeout_expiration = gst_util_get_timestamp () + current_timeout;
 
+  if (bus)
+    gst_object_unref (bus);
   return;
 
 bogus_observation:
+  if (bus)
+    gst_object_unref (bus);
   /* Schedule a new packet again soon */
   self->priv->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4);
   return;
@@ -402,7 +533,6 @@ gst_net_client_clock_thread (gpointer data)
       }
     }
   }
-
   GST_INFO_OBJECT (self, "shutting down net client clock thread");
   return NULL;
 }