rtpbin: alternative inter-stream syncing methods
authorMark Nauwelaerts <mark.nauwelaerts@collabora.co.uk>
Mon, 8 Aug 2011 10:15:20 +0000 (12:15 +0200)
committerMark Nauwelaerts <mark.nauwelaerts@collabora.co.uk>
Mon, 19 Sep 2011 09:52:03 +0000 (11:52 +0200)
... at least if not syncing to NPT time:
* either sync using RTCP SR data (as currently)
* only perform the above once using initial RTCP SR packets
* discard RTCP and sync by equating provided stream's clock-base rtptime,
  as provided by jitterbuffer (typically obtained from RTP-Info in RTSP).

gst/rtpmanager/gstrtpbin.c
gst/rtpmanager/gstrtpbin.h

index e9b3164..c3ca387 100644 (file)
@@ -248,6 +248,7 @@ enum
 #define DEFAULT_AUTOREMOVE           FALSE
 #define DEFAULT_BUFFER_MODE          RTP_JITTER_BUFFER_MODE_SLAVE
 #define DEFAULT_USE_PIPELINE_CLOCK   FALSE
+#define DEFAULT_RTCP_SYNC            GST_RTP_BIN_RTCP_SYNC_ALWAYS
 #define DEFAULT_RTCP_SYNC_INTERVAL   0
 
 enum
@@ -258,6 +259,7 @@ enum
   PROP_DO_LOST,
   PROP_IGNORE_PT,
   PROP_NTP_SYNC,
+  PROP_RTCP_SYNC,
   PROP_RTCP_SYNC_INTERVAL,
   PROP_AUTOREMOVE,
   PROP_BUFFER_MODE,
@@ -265,6 +267,31 @@ enum
   PROP_LAST
 };
 
+enum
+{
+  GST_RTP_BIN_RTCP_SYNC_ALWAYS,
+  GST_RTP_BIN_RTCP_SYNC_INITIAL,
+  GST_RTP_BIN_RTCP_SYNC_RTP
+};
+
+#define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
+static GType
+gst_rtp_bin_rtcp_sync_get_type (void)
+{
+  static GType rtcp_sync_type = 0;
+  static const GEnumValue rtcp_sync_types[] = {
+    {GST_RTP_BIN_RTCP_SYNC_ALWAYS, "always", "always"},
+    {GST_RTP_BIN_RTCP_SYNC_INITIAL, "initial", "initial"},
+    {GST_RTP_BIN_RTCP_SYNC_RTP, "rtp-info", "rtp-info"},
+    {0, NULL, NULL},
+  };
+
+  if (!rtcp_sync_type) {
+    rtcp_sync_type = g_enum_register_static ("GstRTCPSync", rtcp_sync_types);
+  }
+  return rtcp_sync_type;
+}
+
 /* helper objects */
 typedef struct _GstRtpBinSession GstRtpBinSession;
 typedef struct _GstRtpBinStream GstRtpBinStream;
@@ -315,6 +342,9 @@ struct _GstRtpBinStream
   gboolean have_sync;
   /* mapping to local RTP and NTP time */
   gint64 rt_delta;
+  gint64 rtp_delta;
+  /* base rtptime in gst time */
+  gint64 clock_base;
 };
 
 #define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock ((sess)->lock)
@@ -780,6 +810,8 @@ gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
        * lip-sync */
       stream->have_sync = FALSE;
       stream->rt_delta = 0;
+      stream->rtp_delta = 0;
+      stream->clock_base = -100 * GST_SECOND;
     }
   }
   GST_RTP_BIN_UNLOCK (rtpbin);
@@ -984,7 +1016,8 @@ stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
 static void
 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
     guint8 * data, guint64 ntptime, guint64 last_extrtptime,
-    guint64 base_rtptime, guint64 base_time, guint clock_rate)
+    guint64 base_rtptime, guint64 base_time, guint clock_rate,
+    gint64 rtp_clock_base)
 {
   GstRtpBinClient *client;
   gboolean created;
@@ -1027,8 +1060,9 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
 
   GST_DEBUG_OBJECT (bin,
       "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
-      ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", base_rtptime,
-      last_extrtptime, local_rtp, clock_rate);
+      ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d, "
+      "clock-base %" G_GINT64_FORMAT, base_rtptime,
+      last_extrtptime, local_rtp, clock_rate, rtp_clock_base);
 
   /* calculate local RTP time in gstreamer timestamp, we essentially perform the
    * same conversion that a jitterbuffer would use to convert an rtp timestamp
@@ -1075,8 +1109,10 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
     stream->rt_delta = rtdiff - ntpdiff;
 
     stream_set_ts_offset (bin, stream, stream->rt_delta);
-  } else if (client->nstreams > 1) {
-    gint64 min;
+  } else {
+    gint64 min, rtp_min, clock_base = stream->clock_base;
+    gboolean all_sync, use_rtp;
+    gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync);
 
     /* calculate delta between server and receiver. last_unix is created by
      * converting the ntptime in the last SR packet to a gstreamer timestamp. This
@@ -1094,19 +1130,104 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
      * latencies).
      * The stream that has the smallest diff is selected as the reference stream,
      * all other streams will have a positive offset to this difference. */
-    min = G_MAXINT64;
+
+    /* some alternative setting allow ignoring RTCP as much as possible,
+     * for servers generating bogus ntp timeline */
+    min = rtp_min = G_MAXINT64;
+    use_rtp = FALSE;
+    if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) {
+      guint64 ext_base;
+
+      use_rtp = TRUE;
+      /* signed version for convienience */
+      clock_base = base_rtptime;
+      /* deal with possible wrap-around */
+      ext_base = base_rtptime;
+      rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base);
+      /* sanity check; base rtp and provided clock_base should be close */
+      if (rtp_clock_base >= clock_base) {
+        if (rtp_clock_base - clock_base < 10 * clock_rate) {
+          rtp_clock_base = base_time +
+              gst_util_uint64_scale_int (rtp_clock_base - clock_base,
+              GST_SECOND, clock_rate);
+        } else {
+          use_rtp = FALSE;
+        }
+      } else {
+        if (clock_base - rtp_clock_base < 10 * clock_rate) {
+          rtp_clock_base = base_time -
+              gst_util_uint64_scale_int (clock_base - rtp_clock_base,
+              GST_SECOND, clock_rate);
+        } else {
+          use_rtp = FALSE;
+        }
+      }
+      /* warn and bail for clarity out if no sane values */
+      if (!use_rtp) {
+        GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime");
+        return;
+      }
+      /* store to track changes */
+      clock_base = rtp_clock_base;
+      /* generate a fake as before,
+       * now equating rtptime obtained from RTP-Info,
+       * where the large time represent the otherwise irrelevant npt/ntp time */
+      stream->rtp_delta = (GST_SECOND << 28) - rtp_clock_base;
+    }
+
     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
       GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
 
-      if (!ostream->have_sync)
+      if (!ostream->have_sync) {
+        all_sync = FALSE;
         continue;
+      }
+
+      /* change in current stream's base from previously init'ed value
+       * leads to reset of all stream's base */
+      if (stream != ostream && stream->clock_base >= 0 &&
+          (stream->clock_base != clock_base)) {
+        GST_DEBUG_OBJECT (bin, "reset upon clock base change");
+        ostream->clock_base = -100 * GST_SECOND;
+        ostream->rtp_delta = 0;
+      }
 
       if (ostream->rt_delta < min)
         min = ostream->rt_delta;
+      if (ostream->rtp_delta < rtp_min)
+        rtp_min = ostream->rtp_delta;
     }
 
-    GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client,
-        min);
+    /* arrange to re-sync for each stream upon significant change,
+     * e.g. post-seek */
+    all_sync = (stream->clock_base == clock_base);
+    stream->clock_base = clock_base;
+
+    /* may need init performed above later on, but nothing more to do now */
+    if (client->nstreams <= 1)
+      return;
+
+    GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT
+        " all sync %d", client, min, all_sync);
+    GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp);
+
+    switch (rtcp_sync) {
+      case GST_RTP_BIN_RTCP_SYNC_RTP:
+        if (!use_rtp)
+          break;
+        GST_DEBUG_OBJECT (bin, "using rtp generated reports; "
+            "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min);
+        /* fall-through */
+      case GST_RTP_BIN_RTCP_SYNC_INITIAL:
+        /* if all have been synced already, do not bother further */
+        if (all_sync) {
+          GST_DEBUG_OBJECT (bin, "all streams already synced; done");
+          return;
+        }
+        break;
+      default:
+        break;
+    }
 
     /* bail out if we adjusted recently enough */
     if (all_sync && (last_unix - bin->priv->last_unix) <
@@ -1131,7 +1252,10 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
 
       /* calculate offset to our reference stream, this should always give a
        * positive number. */
-      ts_offset = ostream->rt_delta - min;
+      if (use_rtp)
+        ts_offset = ostream->rtp_delta - rtp_min;
+      else
+        ts_offset = ostream->rt_delta - min;
 
       stream_set_ts_offset (bin, ostream, ts_offset);
     }
@@ -1164,6 +1288,7 @@ gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
   guint64 base_rtptime;
   guint64 base_time;
   guint clock_rate;
+  guint64 clock_base;
   guint64 extrtptime;
   GstBuffer *buffer;
 
@@ -1179,6 +1304,7 @@ gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
       g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
   base_time = g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
   clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
+  clock_base = g_value_get_uint64 (gst_structure_get_value (s, "clock-base"));
   extrtptime =
       g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
   buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
@@ -1231,7 +1357,8 @@ gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
               GST_RTP_BIN_LOCK (bin);
               /* associate the stream to CNAME */
               gst_rtp_bin_associate (bin, stream, len, data,
-                  ntptime, extrtptime, base_rtptime, base_time, clock_rate);
+                  ntptime, extrtptime, base_rtptime, base_time, clock_rate,
+                  clock_base);
               GST_RTP_BIN_UNLOCK (bin);
             }
           }
@@ -1275,7 +1402,9 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
 
   stream->have_sync = FALSE;
   stream->rt_delta = 0;
+  stream->rtp_delta = 0;
   stream->percent = 100;
+  stream->clock_base = -100 * GST_SECOND;
   session->streams = g_slist_prepend (session->streams, stream);
 
   /* provide clock_rate to the jitterbuffer when needed */
@@ -1693,6 +1822,19 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /**
+   * GstRtpBin::rtcp-sync:
+   *
+   * If not synchronizing (directly) to the NTP clock, determines how to sync
+   * the various streams.
+   *
+   * Since: 0.10.31
+   */
+  g_object_class_install_property (gobject_class, PROP_RTCP_SYNC,
+      g_param_spec_enum ("rtcp-sync", "RTCP Sync",
+          "Use of RTCP SR in synchronization", GST_RTP_BIN_RTCP_SYNC_TYPE,
+          DEFAULT_RTCP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
    * GstRtpBin::rtcp-sync-interval:
    *
    * Determines how often to sync streams using RTCP data.
@@ -1734,6 +1876,7 @@ gst_rtp_bin_init (GstRtpBin * rtpbin, GstRtpBinClass * klass)
   rtpbin->do_lost = DEFAULT_DO_LOST;
   rtpbin->ignore_pt = DEFAULT_IGNORE_PT;
   rtpbin->ntp_sync = DEFAULT_NTP_SYNC;
+  rtpbin->rtcp_sync = DEFAULT_RTCP_SYNC;
   rtpbin->rtcp_sync_interval = DEFAULT_RTCP_SYNC_INTERVAL;
   rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE;
   rtpbin->buffer_mode = DEFAULT_BUFFER_MODE;
@@ -1850,6 +1993,9 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id,
     case PROP_NTP_SYNC:
       rtpbin->ntp_sync = g_value_get_boolean (value);
       break;
+    case PROP_RTCP_SYNC:
+      g_atomic_int_set (&rtpbin->rtcp_sync, g_value_get_enum (value));
+      break;
     case PROP_RTCP_SYNC_INTERVAL:
       rtpbin->rtcp_sync_interval = g_value_get_uint (value);
       break;
@@ -1915,6 +2061,9 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id,
     case PROP_NTP_SYNC:
       g_value_set_boolean (value, rtpbin->ntp_sync);
       break;
+    case PROP_RTCP_SYNC:
+      g_value_set_enum (value, g_atomic_int_get (&rtpbin->rtcp_sync));
+      break;
     case PROP_RTCP_SYNC_INTERVAL:
       g_value_set_uint (value, rtpbin->rtcp_sync_interval);
       break;
index 7cc845c..a915787 100644 (file)
@@ -50,6 +50,7 @@ struct _GstRtpBin {
   gboolean        do_lost;
   gboolean        ignore_pt;
   gboolean        ntp_sync;
+  gint            rtcp_sync;
   guint           rtcp_sync_interval;
   RTPJitterBufferMode buffer_mode;
   gboolean        buffering;