rtpjitterbuffer: Add RFC7273 media clock handling
authorSebastian Dröge <sebastian@centricular.com>
Tue, 5 Jan 2016 14:15:16 +0000 (16:15 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Sun, 3 Apr 2016 08:24:34 +0000 (11:24 +0300)
https://bugzilla.gnome.org/show_bug.cgi?id=762259

gst/rtpmanager/gstrtpbin.c
gst/rtpmanager/gstrtpbin.h
gst/rtpmanager/gstrtpjitterbuffer.c
gst/rtpmanager/rtpjitterbuffer.c
gst/rtpmanager/rtpjitterbuffer.h
gst/rtsp/gstrtspsrc.c
gst/rtsp/gstrtspsrc.h

index e6bacf2..cbe6901 100644 (file)
@@ -298,6 +298,7 @@ enum
 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
 #define DEFAULT_MAX_DROPOUT_TIME     60000
 #define DEFAULT_MAX_MISORDER_TIME    2000
+#define DEFAULT_RFC7273_SYNC         FALSE
 
 enum
 {
@@ -320,7 +321,8 @@ enum
   PROP_RTCP_SYNC_SEND_TIME,
   PROP_MAX_RTCP_RTP_TIME_DIFF,
   PROP_MAX_DROPOUT_TIME,
-  PROP_MAX_MISORDER_TIME
+  PROP_MAX_MISORDER_TIME,
+  PROP_RFC7273_SYNC
 };
 
 #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type())
@@ -1621,6 +1623,7 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
       rtpbin->max_rtcp_rtp_time_diff, NULL);
   g_object_set (buffer, "max-dropout-time", rtpbin->max_dropout_time,
       "max-misorder-time", rtpbin->max_misorder_time, NULL);
+  g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL);
 
   g_signal_emit (rtpbin, gst_rtp_bin_signals[SIGNAL_NEW_JITTERBUFFER], 0,
       buffer, session->id, ssrc);
@@ -2314,6 +2317,12 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
+      g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
+          "Synchronize received streams to the RFC7273 clock "
+          "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state);
   gstelement_class->request_new_pad =
       GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad);
@@ -2383,6 +2392,7 @@ gst_rtp_bin_init (GstRtpBin * rtpbin)
   rtpbin->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
   rtpbin->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
   rtpbin->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
+  rtpbin->rfc7273_sync = DEFAULT_RFC7273_SYNC;
 
   /* some default SDES entries */
   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
@@ -2599,6 +2609,11 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id,
       gst_rtp_bin_propagate_property_to_session (rtpbin, "max-dropout-time",
           value);
       break;
+    case PROP_RFC7273_SYNC:
+      rtpbin->rfc7273_sync = g_value_get_boolean (value);
+      gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
+          "rfc7273-sync", value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -2681,6 +2696,8 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id,
     case PROP_MAX_MISORDER_TIME:
       g_value_set_uint (value, rtpbin->max_misorder_time);
       break;
+    case PROP_RFC7273_SYNC:
+      g_value_set_boolean (value, rtpbin->rfc7273_sync);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
index 01539d2..15eb88a 100644 (file)
@@ -73,6 +73,7 @@ struct _GstRtpBin {
   gint            max_rtcp_rtp_time_diff;
   guint32         max_dropout_time;
   guint32         max_misorder_time;
+  gboolean        rfc7273_sync;
 
   /* a list of session */
   GSList         *sessions;
index 65f7e14..ae2bf9d 100644 (file)
 #endif
 
 #include <stdlib.h>
+#include <stdio.h>
 #include <string.h>
 #include <gst/rtp/gstrtpbuffer.h>
+#include <gst/net/net.h>
 
 #include "gstrtpjitterbuffer.h"
 #include "rtpjitterbuffer.h"
@@ -141,6 +143,7 @@ enum
 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
 #define DEFAULT_MAX_DROPOUT_TIME    60000
 #define DEFAULT_MAX_MISORDER_TIME   2000
+#define DEFAULT_RFC7273_SYNC        FALSE
 
 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
@@ -166,7 +169,8 @@ enum
   PROP_STATS,
   PROP_MAX_RTCP_RTP_TIME_DIFF,
   PROP_MAX_DROPOUT_TIME,
-  PROP_MAX_MISORDER_TIME
+  PROP_MAX_MISORDER_TIME,
+  PROP_RFC7273_SYNC
 };
 
 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
@@ -413,6 +417,8 @@ static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
     GstPad * pad);
 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
+static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element,
+    GstClock * clock);
 
 /* pad overrides */
 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
@@ -745,6 +751,12 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
+      g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
+          "Synchronize received streams to the RFC7273 clock "
+          "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   /**
    * GstRtpJitterBuffer::request-pt-map:
    * @buffer: the object which received the signal
@@ -820,6 +832,8 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
   gstelement_class->provide_clock =
       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
+  gstelement_class->set_clock =
+      GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock);
 
   gst_element_class_add_static_pad_template (gstelement_class,
       &gst_rtp_jitter_buffer_src_template);
@@ -1129,6 +1143,16 @@ gst_rtp_jitter_buffer_provide_clock (GstElement * element)
   return gst_system_clock_obtain ();
 }
 
+static gboolean
+gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock)
+{
+  GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element);
+
+  rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock);
+
+  return GST_ELEMENT_CLASS (parent_class)->set_clock (element, clock);
+}
+
 static void
 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
 {
@@ -1233,6 +1257,7 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
   GstStructure *caps_struct;
   guint val;
   GstClockTime tval;
+  const gchar *ts_refclk, *mediaclk;
 
   priv = jitterbuffer->priv;
 
@@ -1297,6 +1322,75 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
 
+  if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) {
+    GstClock *clock = NULL;
+    guint64 clock_offset = -1;
+
+    GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s",
+        ts_refclk);
+
+    if (g_str_has_prefix (ts_refclk, "ntp=")) {
+      if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) {
+        GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks");
+      } else {
+        const gchar *host, *portstr;
+        gchar *hostname;
+        guint port;
+
+        host = ts_refclk + sizeof ("ntp=") - 1;
+        if (host[0] == '[') {
+          /* IPv6 */
+          portstr = strchr (host, ']');
+          if (portstr && portstr[1] == ':')
+            portstr = portstr + 1;
+          else
+            portstr = NULL;
+        } else {
+          portstr = strrchr (host, ':');
+        }
+
+
+        if (!portstr || sscanf (portstr, ":%u", &port) != 1)
+          port = 123;
+
+        if (portstr)
+          hostname = g_strndup (host, (portstr - host));
+        else
+          hostname = g_strdup (host);
+
+        clock = gst_ntp_clock_new (NULL, hostname, port, 0);
+        g_free (hostname);
+      }
+    } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) {
+      const gchar *domainstr =
+          ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1;
+      guint domain;
+
+      if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1)
+        domain = 0;
+
+      clock = gst_ptp_clock_new (NULL, domain);
+    } else {
+      GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock");
+    }
+
+    if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) {
+      GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk);
+
+      if (!g_str_has_prefix (mediaclk, "direct=")
+          || sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1)
+        GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock");
+      if (strstr (mediaclk, "rate=") != NULL) {
+        GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported");
+        clock_offset = -1;
+      }
+    }
+
+    rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset);
+  } else {
+    rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1);
+  }
+
   return TRUE;
 
   /* ERRORS */
@@ -1566,7 +1660,7 @@ queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
 
   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
-  rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
+  rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
   if (head)
     JBUF_SIGNAL_EVENT (priv);
 
@@ -2625,7 +2719,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
           RTPJitterBufferItem *item;
 
           item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
-          rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
+          rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
         }
         g_list_free (events);
 
@@ -2741,7 +2835,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
    * FALSE if a packet with the same seqnum was already in the queue, meaning we
    * have a duplicate. */
   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
-              &head, &percent)))
+              &head, &percent,
+              gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)))))
     goto duplicate;
 
   /* update timers */
@@ -3311,7 +3406,7 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
           "retry", G_TYPE_UINT, num_rtx_retry, NULL));
 
   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
-  rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
+  rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
 
   /* remove timer now */
   remove_timer (jitterbuffer, timer);
@@ -3780,7 +3875,7 @@ gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
             RTP_JITTER_BUFFER_MODE_BUFFER) {
           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
           item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
-          rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
+          rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL, -1);
           if (head)
             JBUF_SIGNAL_EVENT (priv);
           JBUF_WAIT_QUERY (priv, out_flushing);
@@ -4018,6 +4113,12 @@ gst_rtp_jitter_buffer_set_property (GObject * object,
       priv->max_misorder_time = g_value_get_uint (value);
       JBUF_UNLOCK (priv);
       break;
+    case PROP_RFC7273_SYNC:
+      JBUF_LOCK (priv);
+      rtp_jitter_buffer_set_rfc7273_sync (priv->jbuf,
+          g_value_get_boolean (value));
+      JBUF_UNLOCK (priv);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -4138,6 +4239,12 @@ gst_rtp_jitter_buffer_get_property (GObject * object,
       g_value_set_uint (value, priv->max_misorder_time);
       JBUF_UNLOCK (priv);
       break;
+    case PROP_RFC7273_SYNC:
+      JBUF_LOCK (priv);
+      g_value_set_boolean (value,
+          rtp_jitter_buffer_get_rfc7273_sync (priv->jbuf));
+      JBUF_UNLOCK (priv);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
index 80d8266..41aa8e6 100644 (file)
@@ -85,6 +85,8 @@ rtp_jitter_buffer_class_init (RTPJitterBufferClass * klass)
 static void
 rtp_jitter_buffer_init (RTPJitterBuffer * jbuf)
 {
+  g_mutex_init (&jbuf->clock_lock);
+
   jbuf->packets = g_queue_new ();
   jbuf->mode = RTP_JITTER_BUFFER_MODE_SLAVE;
 
@@ -98,8 +100,19 @@ rtp_jitter_buffer_finalize (GObject * object)
 
   jbuf = RTP_JITTER_BUFFER_CAST (object);
 
+  if (jbuf->media_clock_synced_id)
+    g_signal_handler_disconnect (jbuf->media_clock,
+        jbuf->media_clock_synced_id);
+  if (jbuf->media_clock)
+    gst_object_unref (jbuf->media_clock);
+
+  if (jbuf->pipeline_clock)
+    gst_object_unref (jbuf->pipeline_clock);
+
   g_queue_free (jbuf->packets);
 
+  g_mutex_clear (&jbuf->clock_lock);
+
   G_OBJECT_CLASS (rtp_jitter_buffer_parent_class)->finalize (object);
 }
 
@@ -199,6 +212,110 @@ rtp_jitter_buffer_get_clock_rate (RTPJitterBuffer * jbuf)
   return jbuf->clock_rate;
 }
 
+static void
+media_clock_synced_cb (GstClock * clock, gboolean synced,
+    RTPJitterBuffer * jbuf)
+{
+  GstClockTime internal, external;
+
+  g_mutex_lock (&jbuf->clock_lock);
+  if (jbuf->pipeline_clock) {
+    internal = gst_clock_get_internal_time (jbuf->media_clock);
+    external = gst_clock_get_time (jbuf->pipeline_clock);
+
+    gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1);
+  }
+  g_mutex_unlock (&jbuf->clock_lock);
+}
+
+/**
+ * rtp_jitter_buffer_set_media_clock:
+ * @jbuf: an #RTPJitterBuffer
+ * @clock: (transfer full): media #GstClock
+ * @clock_offset: RTP time at clock epoch or -1
+ *
+ * Sets the media clock for the media and the clock offset
+ *
+ */
+void
+rtp_jitter_buffer_set_media_clock (RTPJitterBuffer * jbuf, GstClock * clock,
+    guint64 clock_offset)
+{
+  g_mutex_lock (&jbuf->clock_lock);
+  if (jbuf->media_clock) {
+    if (jbuf->media_clock_synced_id)
+      g_signal_handler_disconnect (jbuf->media_clock,
+          jbuf->media_clock_synced_id);
+    jbuf->media_clock_synced_id = 0;
+    gst_object_unref (jbuf->media_clock);
+  }
+  jbuf->media_clock = clock;
+  jbuf->media_clock_offset = clock_offset;
+
+  if (jbuf->pipeline_clock && jbuf->media_clock &&
+      jbuf->pipeline_clock != jbuf->media_clock) {
+    jbuf->media_clock_synced_id =
+        g_signal_connect (jbuf->media_clock, "synced",
+        G_CALLBACK (media_clock_synced_cb), jbuf);
+    if (gst_clock_is_synced (jbuf->media_clock)) {
+      GstClockTime internal, external;
+
+      internal = gst_clock_get_internal_time (jbuf->media_clock);
+      external = gst_clock_get_time (jbuf->pipeline_clock);
+
+      gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1);
+    }
+
+    gst_clock_set_master (jbuf->media_clock, jbuf->pipeline_clock);
+  }
+  g_mutex_unlock (&jbuf->clock_lock);
+}
+
+/**
+ * rtp_jitter_buffer_set_pipeline_clock:
+ * @jbuf: an #RTPJitterBuffer
+ * @clock: pipeline #GstClock
+ *
+ * Sets the pipeline clock
+ *
+ */
+void
+rtp_jitter_buffer_set_pipeline_clock (RTPJitterBuffer * jbuf, GstClock * clock)
+{
+  g_mutex_lock (&jbuf->clock_lock);
+  if (jbuf->pipeline_clock)
+    gst_object_unref (jbuf->pipeline_clock);
+  jbuf->pipeline_clock = clock ? gst_object_ref (clock) : NULL;
+
+  if (jbuf->pipeline_clock && jbuf->media_clock &&
+      jbuf->pipeline_clock != jbuf->media_clock) {
+    if (gst_clock_is_synced (jbuf->media_clock)) {
+      GstClockTime internal, external;
+
+      internal = gst_clock_get_internal_time (jbuf->media_clock);
+      external = gst_clock_get_time (jbuf->pipeline_clock);
+
+      gst_clock_set_calibration (jbuf->media_clock, internal, external, 1, 1);
+    }
+
+    gst_clock_set_master (jbuf->media_clock, jbuf->pipeline_clock);
+  }
+  g_mutex_unlock (&jbuf->clock_lock);
+}
+
+gboolean
+rtp_jitter_buffer_get_rfc7273_sync (RTPJitterBuffer * jbuf)
+{
+  return jbuf->rfc7273_sync;
+}
+
+void
+rtp_jitter_buffer_set_rfc7273_sync (RTPJitterBuffer * jbuf,
+    gboolean rfc7273_sync)
+{
+  jbuf->rfc7273_sync = rfc7273_sync;
+}
+
 /**
  * rtp_jitter_buffer_reset_skew:
  * @jbuf: an #RTPJitterBuffer
@@ -211,6 +328,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf)
   jbuf->base_time = -1;
   jbuf->base_rtptime = -1;
   jbuf->base_extrtp = -1;
+  jbuf->media_clock_base_time = -1;
   jbuf->ext_rtptime = -1;
   jbuf->last_rtptime = -1;
   jbuf->window_pos = 0;
@@ -220,6 +338,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf)
   jbuf->prev_send_diff = -1;
   jbuf->prev_out_time = -1;
   jbuf->need_resync = TRUE;
+
   GST_DEBUG ("reset skew correction");
 }
 
@@ -241,6 +360,7 @@ rtp_jitter_buffer_resync (RTPJitterBuffer * jbuf, GstClockTime time,
     GstClockTime gstrtptime, guint64 ext_rtptime, gboolean reset_skew)
 {
   jbuf->base_time = time;
+  jbuf->media_clock_base_time = -1;
   jbuf->base_rtptime = gstrtptime;
   jbuf->base_extrtp = ext_rtptime;
   jbuf->prev_out_time = -1;
@@ -406,55 +526,18 @@ update_buffer_level (RTPJitterBuffer * jbuf, gint * percent)
  * Returns: @time adjusted with the clock skew.
  */
 static GstClockTime
-calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time)
+calculate_skew (RTPJitterBuffer * jbuf, guint64 ext_rtptime,
+    GstClockTime gstrtptime, GstClockTime time)
 {
-  guint64 ext_rtptime;
   guint64 send_diff, recv_diff;
   gint64 delta;
   gint64 old;
   gint pos, i;
-  GstClockTime gstrtptime, out_time;
+  GstClockTime out_time;
   guint64 slope;
 
-  ext_rtptime = gst_rtp_buffer_ext_timestamp (&jbuf->ext_rtptime, rtptime);
-
-  if (jbuf->last_rtptime != -1 && ext_rtptime == jbuf->last_rtptime)
-    return jbuf->prev_out_time;
-
-  gstrtptime =
-      gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, jbuf->clock_rate);
-
-  /* keep track of the last extended rtptime */
-  jbuf->last_rtptime = ext_rtptime;
-
-  send_diff = 0;
-  if (G_LIKELY (jbuf->base_rtptime != -1)) {
-    /* check elapsed time in RTP units */
-    if (G_LIKELY (gstrtptime >= jbuf->base_rtptime)) {
-      send_diff = gstrtptime - jbuf->base_rtptime;
-    } else {
-      /* elapsed time at sender, timestamps can go backwards and thus be
-       * smaller than our base time, schedule to take a new base time in
-       * that case. */
-      GST_WARNING ("backward timestamps at server, schedule resync");
-      jbuf->need_resync = TRUE;
-      send_diff = 0;
-    }
-  }
-
-  /* need resync, lock on to time and gstrtptime if we can, otherwise we
-   * do with the previous values */
-  if (G_UNLIKELY (jbuf->need_resync && time != -1)) {
-    GST_INFO ("resync to time %" GST_TIME_FORMAT ", rtptime %"
-        GST_TIME_FORMAT, GST_TIME_ARGS (time), GST_TIME_ARGS (gstrtptime));
-    rtp_jitter_buffer_resync (jbuf, time, gstrtptime, ext_rtptime, FALSE);
-    send_diff = 0;
-  }
-
-  GST_DEBUG ("extrtp %" G_GUINT64_FORMAT ", gstrtp %" GST_TIME_FORMAT ", base %"
-      GST_TIME_FORMAT ", send_diff %" GST_TIME_FORMAT, ext_rtptime,
-      GST_TIME_ARGS (gstrtptime), GST_TIME_ARGS (jbuf->base_rtptime),
-      GST_TIME_ARGS (send_diff));
+  /* elapsed time at sender */
+  send_diff = gstrtptime - jbuf->base_rtptime;
 
   /* we don't have an arrival timestamp so we can't do skew detection. we
    * should still apply a timestamp based on RTP timestamp and base_time */
@@ -574,40 +657,9 @@ no_skew:
     } else {
       out_time += jbuf->skew;
     }
-    /* check if timestamps are not going backwards, we can only check this if we
-     * have a previous out time and a previous send_diff */
-    if (G_LIKELY (jbuf->prev_out_time != -1 && jbuf->prev_send_diff != -1)) {
-      /* now check for backwards timestamps */
-      if (G_UNLIKELY (
-              /* if the server timestamps went up and the out_time backwards */
-              (send_diff > jbuf->prev_send_diff
-                  && out_time < jbuf->prev_out_time) ||
-              /* if the server timestamps went backwards and the out_time forwards */
-              (send_diff < jbuf->prev_send_diff
-                  && out_time > jbuf->prev_out_time) ||
-              /* if the server timestamps did not change */
-              send_diff == jbuf->prev_send_diff)) {
-        GST_DEBUG ("backwards timestamps, using previous time");
-        out_time = jbuf->prev_out_time;
-      }
-    }
-    if (time != -1 && out_time + jbuf->delay < time) {
-      /* if we are going to produce a timestamp that is later than the input
-       * timestamp, we need to reset the jitterbuffer. Likely the server paused
-       * temporarily */
-      GST_DEBUG ("out %" GST_TIME_FORMAT " + %" G_GUINT64_FORMAT " < time %"
-          GST_TIME_FORMAT ", reset jitterbuffer", GST_TIME_ARGS (out_time),
-          jbuf->delay, GST_TIME_ARGS (time));
-      rtp_jitter_buffer_resync (jbuf, time, gstrtptime, ext_rtptime, TRUE);
-      out_time = time;
-      send_diff = 0;
-    }
   } else
     out_time = -1;
 
-  jbuf->prev_out_time = out_time;
-  jbuf->prev_send_diff = send_diff;
-
   GST_DEBUG ("skew %" G_GINT64_FORMAT ", out %" GST_TIME_FORMAT,
       jbuf->skew, GST_TIME_ARGS (out_time));
 
@@ -642,6 +694,7 @@ queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item)
  * @item: an #RTPJitterBufferItem to insert
  * @head: TRUE when the head element changed.
  * @percent: the buffering percent after insertion
+ * @base_time: base time of the pipeline
  *
  * Inserts @item into the packet queue of @jbuf. The sequence number of the
  * packet will be used to sort the packets. This function takes ownerhip of
@@ -655,12 +708,16 @@ queue_do_insert (RTPJitterBuffer * jbuf, GList * list, GList * item)
  */
 gboolean
 rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item,
-    gboolean * head, gint * percent)
+    gboolean * head, gint * percent, GstClockTime base_time)
 {
   GList *list, *event = NULL;
   guint32 rtptime;
+  guint64 ext_rtptime;
   guint16 seqnum;
-  GstClockTime dts;
+  GstClockTime gstrtptime, dts;
+  GstClock *media_clock, *pipeline_clock;
+  guint64 media_clock_offset;
+  gboolean rfc7273_mode;
 
   g_return_val_if_fail (jbuf != NULL, FALSE);
   g_return_val_if_fail (item != NULL, FALSE);
@@ -737,6 +794,37 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item,
     }
   }
 
+  /* Return the last time if we got the same RTP timestamp again */
+  ext_rtptime = gst_rtp_buffer_ext_timestamp (&jbuf->ext_rtptime, rtptime);
+  if (jbuf->last_rtptime != -1 && ext_rtptime == jbuf->last_rtptime) {
+    item->pts = jbuf->prev_out_time;
+    goto append;
+  }
+
+  /* keep track of the last extended rtptime */
+  jbuf->last_rtptime = ext_rtptime;
+
+  g_mutex_lock (&jbuf->clock_lock);
+  media_clock = jbuf->media_clock ? gst_object_ref (jbuf->media_clock) : NULL;
+  pipeline_clock =
+      jbuf->pipeline_clock ? gst_object_ref (jbuf->pipeline_clock) : NULL;
+  media_clock_offset = jbuf->media_clock_offset;
+  g_mutex_unlock (&jbuf->clock_lock);
+
+  gstrtptime =
+      gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, jbuf->clock_rate);
+
+  if (G_LIKELY (jbuf->base_rtptime != -1)) {
+    /* check elapsed time in RTP units */
+    if (gstrtptime < jbuf->base_rtptime) {
+      /* elapsed time at sender, timestamps can go backwards and thus be
+       * smaller than our base time, schedule to take a new base time in
+       * that case. */
+      GST_WARNING ("backward timestamps at server, schedule resync");
+      jbuf->need_resync = TRUE;
+    }
+  }
+
   switch (jbuf->mode) {
     case RTP_JITTER_BUFFER_MODE_NONE:
     case RTP_JITTER_BUFFER_MODE_BUFFER:
@@ -752,16 +840,178 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, RTPJitterBufferItem * item,
     case RTP_JITTER_BUFFER_MODE_SYNCED:
       /* synchronized clocks, take first timestamp as base, use RTP timestamps
        * to interpolate */
-      if (jbuf->base_time != -1)
+      if (jbuf->base_time != -1 && !jbuf->need_resync)
         dts = -1;
       break;
     case RTP_JITTER_BUFFER_MODE_SLAVE:
     default:
       break;
   }
-  /* do skew calculation by measuring the difference between rtptime and the
-   * receive dts, this function will return the skew corrected rtptime. */
-  item->pts = calculate_skew (jbuf, rtptime, dts);
+
+  /* need resync, lock on to time and gstrtptime if we can, otherwise we
+   * do with the previous values */
+  if (G_UNLIKELY (jbuf->need_resync && dts != -1)) {
+    GST_INFO ("resync to time %" GST_TIME_FORMAT ", rtptime %"
+        GST_TIME_FORMAT, GST_TIME_ARGS (time), GST_TIME_ARGS (gstrtptime));
+    rtp_jitter_buffer_resync (jbuf, dts, gstrtptime, ext_rtptime, FALSE);
+  }
+
+  GST_DEBUG ("extrtp %" G_GUINT64_FORMAT ", gstrtp %" GST_TIME_FORMAT ", base %"
+      GST_TIME_FORMAT ", send_diff %" GST_TIME_FORMAT, ext_rtptime,
+      GST_TIME_ARGS (gstrtptime), GST_TIME_ARGS (jbuf->base_rtptime),
+      GST_TIME_ARGS (gstrtptime - jbuf->base_rtptime));
+
+  rfc7273_mode = media_clock && pipeline_clock
+      && gst_clock_is_synced (media_clock);
+
+  if (rfc7273_mode && jbuf->mode == RTP_JITTER_BUFFER_MODE_SLAVE
+      && (media_clock_offset == -1 || !jbuf->rfc7273_sync)) {
+    GstClockTime internal, external;
+    GstClockTime rate_num, rate_denom;
+    GstClockTime nsrtptimediff, rtpntptime, rtpsystime;
+
+    gst_clock_get_calibration (media_clock, &internal, &external, &rate_num,
+        &rate_denom);
+
+    /* Slave to the RFC7273 media clock instead of trying to estimate it
+     * based on receive times and RTP timestamps */
+
+    if (jbuf->media_clock_base_time == -1) {
+      if (jbuf->base_time != -1) {
+        jbuf->media_clock_base_time =
+            gst_clock_unadjust_with_calibration (media_clock,
+            jbuf->base_time + base_time, internal, external, rate_num,
+            rate_denom);
+      } else {
+        if (dts != -1)
+          jbuf->media_clock_base_time =
+              gst_clock_unadjust_with_calibration (media_clock, dts + base_time,
+              internal, external, rate_num, rate_denom);
+        else
+          jbuf->media_clock_base_time =
+              gst_clock_get_internal_time (media_clock);
+        jbuf->base_rtptime = gstrtptime;
+      }
+    }
+
+    if (gstrtptime > jbuf->base_rtptime)
+      nsrtptimediff = gstrtptime - jbuf->base_rtptime;
+    else
+      nsrtptimediff = 0;
+
+    rtpntptime = nsrtptimediff + jbuf->media_clock_base_time;
+
+    rtpsystime =
+        gst_clock_adjust_with_calibration (media_clock, rtpntptime, internal,
+        external, rate_num, rate_denom);
+
+    if (rtpsystime > base_time)
+      item->pts = rtpsystime - base_time;
+    else
+      item->pts = 0;
+
+    GST_DEBUG ("RFC7273 clock time %" GST_TIME_FORMAT ", out %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (rtpsystime), GST_TIME_ARGS (item->pts));
+  } else if (rfc7273_mode && (jbuf->mode == RTP_JITTER_BUFFER_MODE_SLAVE
+          || jbuf->mode == RTP_JITTER_BUFFER_MODE_SYNCED)
+      && media_clock_offset != -1 && jbuf->rfc7273_sync) {
+    GstClockTime ntptime, rtptime_tmp;
+    GstClockTime ntprtptime, rtpsystime;
+    GstClockTime internal, external;
+    GstClockTime rate_num, rate_denom;
+
+    /* Don't do any of the dts related adjustments further down */
+    dts = -1;
+
+    /* Calculate the actual clock time on the sender side based on the
+     * RFC7273 clock and convert it to our pipeline clock
+     */
+
+    gst_clock_get_calibration (media_clock, &internal, &external, &rate_num,
+        &rate_denom);
+
+    ntptime = gst_clock_get_internal_time (media_clock);
+
+    ntprtptime = gst_util_uint64_scale (ntptime, jbuf->clock_rate, GST_SECOND);
+    ntprtptime += media_clock_offset;
+    ntprtptime &= 0xffffffff;
+
+    rtptime_tmp = rtptime;
+    /* Check for wraparounds, we assume that the diff between current RTP
+     * timestamp and current media clock time can't be bigger than
+     * 2**31 clock units */
+    if (ntprtptime > rtptime_tmp && ntprtptime - rtptime_tmp >= 0x80000000)
+      rtptime_tmp += G_GUINT64_CONSTANT (0x100000000);
+    else if (rtptime_tmp > ntprtptime && rtptime_tmp - ntprtptime >= 0x80000000)
+      ntprtptime += G_GUINT64_CONSTANT (0x100000000);
+
+    if (ntprtptime > rtptime_tmp)
+      ntptime -=
+          gst_util_uint64_scale (ntprtptime - rtptime_tmp, jbuf->clock_rate,
+          GST_SECOND);
+    else
+      ntptime +=
+          gst_util_uint64_scale (rtptime_tmp - ntprtptime, jbuf->clock_rate,
+          GST_SECOND);
+
+    rtpsystime =
+        gst_clock_adjust_with_calibration (media_clock, ntptime, internal,
+        external, rate_num, rate_denom);
+    /* All this assumes that the pipeline has enough additional
+     * latency to cover for the network delay */
+    if (rtpsystime > base_time)
+      item->pts = rtpsystime - base_time;
+    else
+      item->pts = 0;
+
+    GST_DEBUG ("RFC7273 clock time %" GST_TIME_FORMAT ", out %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (rtpsystime), GST_TIME_ARGS (item->pts));
+  } else {
+    /* If we used the RFC7273 clock before and not anymore,
+     * we need to resync it later again */
+    jbuf->media_clock_base_time = -1;
+
+    /* do skew calculation by measuring the difference between rtptime and the
+     * receive dts, this function will return the skew corrected rtptime. */
+    item->pts = calculate_skew (jbuf, ext_rtptime, gstrtptime, dts);
+  }
+
+  /* check if timestamps are not going backwards, we can only check this if we
+   * have a previous out time and a previous send_diff */
+  if (G_LIKELY (item->pts != -1 && jbuf->prev_out_time != -1
+          && jbuf->prev_send_diff != -1)) {
+    /* now check for backwards timestamps */
+    if (G_UNLIKELY (
+            /* if the server timestamps went up and the out_time backwards */
+            (gstrtptime - jbuf->base_rtptime > jbuf->prev_send_diff
+                && item->pts < jbuf->prev_out_time) ||
+            /* if the server timestamps went backwards and the out_time forwards */
+            (gstrtptime - jbuf->base_rtptime < jbuf->prev_send_diff
+                && item->pts > jbuf->prev_out_time) ||
+            /* if the server timestamps did not change */
+            gstrtptime - jbuf->base_rtptime == jbuf->prev_send_diff)) {
+      GST_DEBUG ("backwards timestamps, using previous time");
+      item->pts = jbuf->prev_out_time;
+    }
+  }
+  if (dts != -1 && item->pts + jbuf->delay < dts) {
+    /* if we are going to produce a timestamp that is later than the input
+     * timestamp, we need to reset the jitterbuffer. Likely the server paused
+     * temporarily */
+    GST_DEBUG ("out %" GST_TIME_FORMAT " + %" G_GUINT64_FORMAT " < time %"
+        GST_TIME_FORMAT ", reset jitterbuffer", GST_TIME_ARGS (item->pts),
+        jbuf->delay, GST_TIME_ARGS (time));
+    rtp_jitter_buffer_resync (jbuf, dts, gstrtptime, ext_rtptime, TRUE);
+    item->pts = dts;
+  }
+
+  jbuf->prev_out_time = item->pts;
+  jbuf->prev_send_diff = gstrtptime - jbuf->base_rtptime;
+
+  if (media_clock)
+    gst_object_unref (media_clock);
+  if (pipeline_clock)
+    gst_object_unref (pipeline_clock);
 
 append:
   queue_do_insert (jbuf, list, (GList *) item);
index ba8da6d..c7115fe 100644 (file)
@@ -89,6 +89,7 @@ struct _RTPJitterBuffer {
   gboolean       need_resync;
   GstClockTime   base_time;
   GstClockTime   base_rtptime;
+  GstClockTime   media_clock_base_time;
   guint32        clock_rate;
   GstClockTime   base_extrtp;
   GstClockTime   prev_out_time;
@@ -102,6 +103,14 @@ struct _RTPJitterBuffer {
   gint64         skew;
   gint64         prev_send_diff;
   gboolean       buffering_disabled;
+
+  GMutex         clock_lock;
+  GstClock      *pipeline_clock;
+  GstClock      *media_clock;
+  gulong         media_clock_synced_id;
+  guint64        media_clock_offset;
+
+  gboolean       rfc7273_sync;
 };
 
 struct _RTPJitterBufferClass {
@@ -150,11 +159,18 @@ void                  rtp_jitter_buffer_set_delay        (RTPJitterBuffer *jbuf,
 void                  rtp_jitter_buffer_set_clock_rate   (RTPJitterBuffer *jbuf, guint32 clock_rate);
 guint32               rtp_jitter_buffer_get_clock_rate   (RTPJitterBuffer *jbuf);
 
+void                  rtp_jitter_buffer_set_media_clock  (RTPJitterBuffer *jbuf, GstClock * clock, guint64 clock_offset);
+void                  rtp_jitter_buffer_set_pipeline_clock (RTPJitterBuffer *jbuf, GstClock * clock);
+
+gboolean              rtp_jitter_buffer_get_rfc7273_sync (RTPJitterBuffer *jbuf);
+void                  rtp_jitter_buffer_set_rfc7273_sync (RTPJitterBuffer *jbuf, gboolean rfc7273_sync);
+
 void                  rtp_jitter_buffer_reset_skew       (RTPJitterBuffer *jbuf);
 
 gboolean              rtp_jitter_buffer_insert           (RTPJitterBuffer *jbuf,
                                                           RTPJitterBufferItem *item,
-                                                          gboolean *head, gint *percent);
+                                                          gboolean *head, gint *percent,
+                                                          GstClockTime base_time);
 
 void                  rtp_jitter_buffer_disable_buffering (RTPJitterBuffer *jbuf, gboolean disabled);
 
index b41666d..e2b3a41 100644 (file)
@@ -227,6 +227,7 @@ gst_rtsp_src_ntp_time_source_get_type (void)
 #define DEFAULT_NTP_TIME_SOURCE  NTP_TIME_SOURCE_NTP
 #define DEFAULT_USER_AGENT       "GStreamer/" PACKAGE_VERSION
 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
+#define DEFAULT_RFC7273_SYNC         FALSE
 
 enum
 {
@@ -265,7 +266,8 @@ enum
   PROP_DO_RETRANSMISSION,
   PROP_NTP_TIME_SOURCE,
   PROP_USER_AGENT,
-  PROP_MAX_RTCP_RTP_TIME_DIFF
+  PROP_MAX_RTCP_RTP_TIME_DIFF,
+  PROP_RFC7273_SYNC
 };
 
 #define GST_TYPE_RTSP_NAT_METHOD (gst_rtsp_nat_method_get_type())
@@ -732,6 +734,12 @@ gst_rtspsrc_class_init (GstRTSPSrcClass * klass)
           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
+      g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
+          "Synchronize received streams to the RFC7273 clock "
+          "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   /**
    * GstRTSPSrc::handle-request:
    * @rtspsrc: a #GstRTSPSrc
@@ -879,6 +887,7 @@ gst_rtspsrc_init (GstRTSPSrc * src)
   src->ntp_time_source = DEFAULT_NTP_TIME_SOURCE;
   src->user_agent = g_strdup (DEFAULT_USER_AGENT);
   src->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
+  src->rfc7273_sync = DEFAULT_RFC7273_SYNC;
 
   /* get a list of all extensions */
   src->extensions = gst_rtsp_ext_list_get ();
@@ -1158,6 +1167,9 @@ gst_rtspsrc_set_property (GObject * object, guint prop_id, const GValue * value,
     case PROP_MAX_RTCP_RTP_TIME_DIFF:
       rtspsrc->max_rtcp_rtp_time_diff = g_value_get_int (value);
       break;
+    case PROP_RFC7273_SYNC:
+      rtspsrc->rfc7273_sync = g_value_get_boolean (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1304,6 +1316,9 @@ gst_rtspsrc_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_MAX_RTCP_RTP_TIME_DIFF:
       g_value_set_int (value, rtspsrc->max_rtcp_rtp_time_diff);
       break;
+    case PROP_RFC7273_SYNC:
+      g_value_set_boolean (value, rtspsrc->rfc7273_sync);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -2988,6 +3003,10 @@ gst_rtspsrc_stream_configure_manager (GstRTSPSrc * src, GstRTSPStream * stream,
         g_object_set (src->manager, "ntp-sync", src->ntp_sync, NULL);
       }
 
+      if (g_object_class_find_property (klass, "rfc7273-sync")) {
+        g_object_set (src->manager, "rfc7273-sync", src->rfc7273_sync, NULL);
+      }
+
       if (src->use_pipeline_clock) {
         if (g_object_class_find_property (klass, "use-pipeline-clock")) {
           g_object_set (src->manager, "use-pipeline-clock", TRUE, NULL);
index dc218dc..df1741c 100644 (file)
@@ -239,6 +239,7 @@ struct _GstRTSPSrc {
   gint              ntp_time_source;
   gchar            *user_agent;
   GstClockTime      max_rtcp_rtp_time_diff;
+  gboolean          rfc7273_sync;
 
   /* state */
   GstRTSPState       state;