rtpbin: add ntp-sync property
authorWim Taymans <wim.taymans@collabora.co.uk>
Fri, 27 Aug 2010 15:58:47 +0000 (17:58 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Mon, 6 Sep 2010 09:01:57 +0000 (11:01 +0200)
Add an ntp-sync property that will sync the received streams to the server
NTP time. This requires synchronized NTP times between the sender and receivers,
like with ntpd.

Based on patch from Thijs Vermeir.

Fixes #627796

gst/rtpmanager/gstrtpbin.c
gst/rtpmanager/gstrtpbin.h

index 9ffa501..1305772 100644 (file)
@@ -236,6 +236,7 @@ enum
 #define DEFAULT_SDES                 NULL
 #define DEFAULT_DO_LOST              FALSE
 #define DEFAULT_IGNORE_PT            FALSE
+#define DEFAULT_NTP_SYNC             FALSE
 #define DEFAULT_AUTOREMOVE           FALSE
 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
@@ -247,6 +248,7 @@ enum
   PROP_SDES,
   PROP_DO_LOST,
   PROP_IGNORE_PT,
+  PROP_NTP_SYNC,
   PROP_AUTOREMOVE,
   PROP_BUFFER_MODE,
   PROP_USE_PIPELINE_CLOCK,
@@ -299,10 +301,10 @@ struct _GstRtpBinStream
   gulong demux_ptreq_sig;
   gulong demux_ptchange_sig;
 
-  /* if we have calculated a valid unix_delta for this stream */
+  /* if we have calculated a valid rt_delta for this stream */
   gboolean have_sync;
   /* mapping to local RTP and NTP time */
-  gint64 unix_delta;
+  gint64 rt_delta;
 };
 
 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock ((sess)->lock)
@@ -767,7 +769,7 @@ gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
       /* make use require a new SR packet for this stream before we attempt new
        * lip-sync */
       stream->have_sync = FALSE;
-      stream->unix_delta = 0;
+      stream->rt_delta = 0;
     }
   }
   GST_RTP_BIN_UNLOCK (rtpbin);
@@ -887,19 +889,102 @@ free_client (GstRtpBinClient * client, GstRtpBin * bin)
   g_free (client);
 }
 
+static void
+get_current_times (GstRtpBin * bin, GstClockTime * running_time,
+    guint64 * ntpnstime)
+{
+  guint64 ntpns;
+  GstClock *clock;
+  GstClockTime base_time, rt, clock_time;
+
+  GST_OBJECT_LOCK (bin);
+  if ((clock = GST_ELEMENT_CLOCK (bin))) {
+    base_time = GST_ELEMENT_CAST (bin)->base_time;
+    gst_object_ref (clock);
+    GST_OBJECT_UNLOCK (bin);
+
+    clock_time = gst_clock_get_time (clock);
+
+    if (bin->use_pipeline_clock) {
+      ntpns = clock_time;
+    } else {
+      GTimeVal current;
+
+      /* get current NTP time */
+      g_get_current_time (&current);
+      ntpns = GST_TIMEVAL_TO_TIME (current);
+    }
+
+    /* add constant to convert from 1970 based time to 1900 based time */
+    ntpns += (2208988800LL * GST_SECOND);
+
+    /* get current clock time and convert to running time */
+    rt = clock_time - base_time;
+
+    gst_object_unref (clock);
+  } else {
+    GST_OBJECT_UNLOCK (bin);
+    rt = -1;
+    ntpns = -1;
+  }
+  if (running_time)
+    *running_time = rt;
+  if (ntpnstime)
+    *ntpnstime = ntpns;
+}
+
+static void
+stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
+    gint64 ts_offset)
+{
+  gint64 prev_ts_offset;
+
+  g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
+
+  /* delta changed, see how much */
+  if (prev_ts_offset != ts_offset) {
+    gint64 diff;
+
+    diff = prev_ts_offset - ts_offset;
+
+    GST_DEBUG_OBJECT (bin,
+        "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT
+        ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
+
+    /* only change diff when it changed more than 4 milliseconds. This
+     * compensates for rounding errors in NTP to RTP timestamp
+     * conversions */
+    if (ABS (diff) > 4 * GST_MSECOND) {
+      if (ABS (diff) < (3 * GST_SECOND)) {
+        g_object_set (stream->buffer, "ts-offset", ts_offset, NULL);
+      } else {
+        GST_WARNING_OBJECT (bin, "offset unusually large, ignoring");
+      }
+    } else {
+      GST_DEBUG_OBJECT (bin, "offset too small, ignoring");
+    }
+  }
+  GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
+      stream->ssrc, ts_offset);
+}
+
 /* associate a stream to the given CNAME. This will make sure all streams for
  * that CNAME are synchronized together.
  * Must be called with GST_RTP_BIN_LOCK */
 static void
 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
-    guint8 * data, guint64 last_unix, guint64 last_extrtptime,
+    guint8 * data, guint64 ntptime, guint64 last_extrtptime,
     guint64 base_rtptime, guint64 base_time, guint clock_rate)
 {
   GstRtpBinClient *client;
   gboolean created;
   GSList *walk;
-  guint64 local_unix;
+  guint64 local_rt;
   guint64 local_rtp;
+  GstClockTime running_time;
+  guint64 ntpnstime;
+  gint64 ntpdiff, rtdiff;
+  guint64 last_unix;
 
   /* first find or create the CNAME */
   client = get_client (bin, len, data, &created);
@@ -924,9 +1009,10 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
         stream->ssrc, client, client->cname);
   }
 
-  /* take the extended rtptime we found in the SR packet and map it to the
+  /* Take the extended rtptime we found in the SR packet and map it to the
    * local rtptime. The local rtp time is used to construct timestamps on the
-   * buffers. */
+   * buffers so we will calculate what running_time corresponds to the RTP
+   * timestamp in the SR packet. */
   local_rtp = last_extrtptime - base_rtptime;
 
   GST_DEBUG_OBJECT (bin,
@@ -934,29 +1020,63 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
       ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", base_rtptime,
       last_extrtptime, local_rtp, clock_rate);
 
-  /* calculate local NTP time in gstreamer timestamp, we essentially perform the
+  /* calculate local RTP time in gstreamer timestamp, we essentially perform the
    * same conversion that a jitterbuffer would use to convert an rtp timestamp
-   * into a corresponding gstreamer timestamp. */
-  local_unix = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
-  local_unix += base_time;
-
-  /* calculate delta between server and receiver. last_unix is created by
-   * converting the ntptime in the last SR packet to a gstreamer timestamp. This
-   * delta expresses the difference to our timeline and the server timeline. */
-  stream->unix_delta = last_unix - local_unix;
+   * into a corresponding gstreamer timestamp. Note that the base_time also
+   * contains the drift between sender and receiver. */
+  local_rt = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
+  local_rt += base_time;
+
+  /* convert ntptime to unix time since 1900 */
+  last_unix = gst_util_uint64_scale (ntptime, GST_SECOND,
+      (G_GINT64_CONSTANT (1) << 32));
+
   stream->have_sync = TRUE;
 
   GST_DEBUG_OBJECT (bin,
-      "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT
-      ", delta %" G_GINT64_FORMAT, local_unix, last_unix, stream->unix_delta);
+      "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT,
+      local_rt, last_unix);
 
   /* recalc inter stream playout offset, but only if there is more than one
-   * stream. */
-  if (client->nstreams > 1) {
+   * stream or we're doing NTP sync. */
+  if (bin->ntp_sync) {
+    /* For NTP sync we need to first get a snapshot of running_time and NTP
+     * time. We know at what running_time we play a certain RTP time, we also
+     * calculated when we would play the RTP time in the SR packet. Now we need
+     * to know how the running_time and the NTP time relate to eachother. */
+    get_current_times (bin, &running_time, &ntpnstime);
+
+    /* see how far away the NTP time is. This is the difference between the
+     * current NTP time and the NTP time in the last SR packet. */
+    ntpdiff = ntpnstime - last_unix;
+    /* see how far away the running_time is. This is the difference between the
+     * current running_time and the running_time of the RTP timestamp in the
+     * last SR packet. */
+    rtdiff = running_time - local_rt;
+
+    GST_DEBUG_OBJECT (bin,
+        "NTP time %" G_GUINT64_FORMAT ", last unix %" G_GUINT64_FORMAT,
+        ntpnstime, last_unix);
+    GST_DEBUG_OBJECT (bin,
+        "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff,
+        rtdiff);
+
+    /* combine to get the final diff to apply to the running_time */
+    stream->rt_delta = rtdiff - ntpdiff;
+
+    stream_set_ts_offset (bin, stream, stream->rt_delta);
+  } else if (client->nstreams > 1) {
     gint64 min;
 
+    /* calculate delta between server and receiver. last_unix is created by
+     * converting the ntptime in the last SR packet to a gstreamer timestamp. This
+     * delta expresses the difference to our timeline and the server timeline. The
+     * difference in itself doesn't mean much but we can combine the delta of
+     * multiple streams to create a stream specific offset. */
+    stream->rt_delta = last_unix - local_rt;
+
     /* calculate the min of all deltas, ignoring streams that did not yet have a
-     * valid unix_delta because we did not yet receive an SR packet for those
+     * valid rt_delta because we did not yet receive an SR packet for those
      * streams.
      * We calculate the mininum because we would like to only apply positive
      * offsets to streams, delaying their playback instead of trying to speed up
@@ -971,8 +1091,8 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
       if (!ostream->have_sync)
         continue;
 
-      if (ostream->unix_delta < min)
-        min = ostream->unix_delta;
+      if (ostream->rt_delta < min)
+        min = ostream->rt_delta;
     }
 
     GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client,
@@ -981,7 +1101,7 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
     /* calculate offsets for each stream */
     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
-      gint64 ts_offset, prev_ts_offset;
+      gint64 ts_offset;
 
       /* ignore streams for which we didn't receive an SR packet yet, we
        * can't synchronize them yet. We can however sync other streams just
@@ -991,32 +1111,9 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
 
       /* calculate offset to our reference stream, this should always give a
        * positive number. */
-      ts_offset = ostream->unix_delta - min;
-
-      g_object_get (ostream->buffer, "ts-offset", &prev_ts_offset, NULL);
-
-      /* delta changed, see how much */
-      if (prev_ts_offset != ts_offset) {
-        gint64 diff;
+      ts_offset = ostream->rt_delta - min;
 
-        if (prev_ts_offset > ts_offset)
-          diff = prev_ts_offset - ts_offset;
-        else
-          diff = ts_offset - prev_ts_offset;
-
-        GST_DEBUG_OBJECT (bin,
-            "ts-offset %" G_GUINT64_FORMAT ", prev %" G_GUINT64_FORMAT
-            ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
-
-        /* only change diff when it changed more than 4 milliseconds. This
-         * compensates for rounding errors in NTP to RTP timestamp
-         * conversions */
-        if (diff > 4 * GST_MSECOND && diff < (3 * GST_SECOND)) {
-          g_object_set (ostream->buffer, "ts-offset", ts_offset, NULL);
-        }
-      }
-      GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
-          ostream->ssrc, ts_offset);
+      stream_set_ts_offset (bin, ostream, ts_offset);
     }
   }
   return;
@@ -1114,8 +1211,7 @@ gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
               GST_RTP_BIN_LOCK (bin);
               /* associate the stream to CNAME */
               gst_rtp_bin_associate (bin, stream, len, data,
-                  gst_rtcp_ntp_to_unix (ntptime), extrtptime,
-                  base_rtptime, base_time, clock_rate);
+                  ntptime, extrtptime, base_rtptime, base_time, clock_rate);
               GST_RTP_BIN_UNLOCK (bin);
             }
           }
@@ -1158,7 +1254,7 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
   stream->demux = demux;
 
   stream->have_sync = FALSE;
-  stream->unix_delta = 0;
+  stream->rt_delta = 0;
   stream->percent = 100;
   session->streams = g_slist_prepend (session->streams, stream);
 
@@ -1547,7 +1643,6 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
       g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock",
           "Use the pipeline clock to set the NTP time in the RTCP SR messages",
           DEFAULT_AUTOREMOVE, G_PARAM_READWRITE));
-
   /**
    * GstRtpBin::buffer-mode:
    *
@@ -1559,6 +1654,19 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
       g_param_spec_enum ("buffer-mode", "Buffer Mode",
           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
           DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  /**
+   * GstRtpBin::ntp-sync:
+   *
+   * Synchronize received streams to the NTP clock. When the NTP clock is shared
+   * between the receivers and the senders (such as when using ntpd) this option
+   * can be used to synchronize receivers on multiple machines.
+   *
+   * Since: 0.10.21
+   */
+  g_object_class_install_property (gobject_class, PROP_NTP_SYNC,
+      g_param_spec_boolean ("ntp-sync", "Sync on NTP clock",
+          "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
   gstelement_class->request_new_pad =
@@ -1588,6 +1696,7 @@ gst_rtp_bin_init (GstRtpBin * rtpbin, GstRtpBinClass * klass)
   rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND;
   rtpbin->do_lost = DEFAULT_DO_LOST;
   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
+  rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
   rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK;
@@ -1698,6 +1807,9 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id,
       GST_RTP_BIN_UNLOCK (rtpbin);
       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value);
       break;
+    case PROP_NTP_SYNC:
+      rtpbin->ntp_sync = g_value_get_boolean (value);
+      break;
     case PROP_IGNORE_PT:
       rtpbin->ignore_pt = g_value_get_boolean (value);
       break;
@@ -1755,6 +1867,9 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id,
     case PROP_IGNORE_PT:
       g_value_set_boolean (value, rtpbin->ignore_pt);
       break;
+    case PROP_NTP_SYNC:
+      g_value_set_boolean (value, rtpbin->ntp_sync);
+      break;
     case PROP_AUTOREMOVE:
       g_value_set_boolean (value, rtpbin->priv->autoremove);
       break;
index b64f86e..74aaac2 100644 (file)
@@ -49,6 +49,7 @@ struct _GstRtpBin {
   guint64         latency_ns;
   gboolean        do_lost;
   gboolean        ignore_pt;
+  gboolean        ntp_sync;
   RTPJitterBufferMode buffer_mode;
   gboolean        buffering;
   gboolean        use_pipeline_clock;