rtpsession: Update 64-bit NTP header extensions with the actual NTP time in senders
authorSebastian Dröge <sebastian@centricular.com>
Wed, 6 Apr 2022 12:39:14 +0000 (15:39 +0300)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 20 Apr 2022 14:40:25 +0000 (14:40 +0000)
Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/2132>

subprojects/gst-plugins-good/gst/rtpmanager/gstrtpsession.c
subprojects/gst-plugins-good/gst/rtpmanager/rtpsession.c
subprojects/gst-plugins-good/gst/rtpmanager/rtpsession.h
subprojects/gst-plugins-good/gst/rtpmanager/rtpstats.h

index cc9d59f..0f018aa 100644 (file)
@@ -2402,6 +2402,8 @@ gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession,
   GstFlowReturn ret;
   GstClockTime timestamp, running_time;
   GstClockTime current_time;
+  guint64 ntpnstime;
+  GstClock *clock;
 
   priv = rtpsession->priv;
 
@@ -2435,8 +2437,83 @@ gst_rtp_session_chain_send_rtp_common (GstRtpSession * rtpsession,
   }
 
   current_time = gst_clock_get_time (priv->sysclock);
+
+  /* Calculate the NTP time of this packet based on the session configuration
+   * and the running time from above */
+  GST_OBJECT_LOCK (rtpsession);
+  if (running_time != -1 && (clock = GST_ELEMENT_CLOCK (rtpsession))) {
+    GstClockTime base_time;
+    base_time = GST_ELEMENT_CAST (rtpsession)->base_time;
+    gst_object_ref (clock);
+    GST_OBJECT_UNLOCK (rtpsession);
+
+    if (rtpsession->priv->use_pipeline_clock) {
+      ntpnstime = running_time;
+      /* add constant to convert from 1970 based time to 1900 based time */
+      ntpnstime += (2208988800LL * GST_SECOND);
+    } else {
+      switch (rtpsession->priv->ntp_time_source) {
+        case GST_RTP_NTP_TIME_SOURCE_NTP:
+        case GST_RTP_NTP_TIME_SOURCE_UNIX:{
+          GstClockTime wallclock_now, pipeline_now;
+
+          /* pipeline clock time for this packet */
+          ntpnstime = running_time + base_time;
+
+          /* get current wallclock and pipeline clock time */
+          wallclock_now = g_get_real_time () * GST_USECOND;
+          pipeline_now = gst_clock_get_time (clock);
+
+          /* adjust pipeline clock time by the current diff.
+           * Note that this will include some jitter for each packet */
+          if (wallclock_now > pipeline_now) {
+            GstClockTime diff = wallclock_now - pipeline_now;
+
+            ntpnstime += diff;
+          } else {
+            GstClockTime diff = pipeline_now - wallclock_now;
+
+            if (diff > ntpnstime) {
+              /* This can't really happen unless the clock configuration is
+               * broken */
+              ntpnstime = GST_CLOCK_TIME_NONE;
+            } else {
+              ntpnstime -= diff;
+            }
+          }
+
+          /* add constant to convert from 1970 based time to 1900 based time */
+          if (ntpnstime != GST_CLOCK_TIME_NONE
+              && rtpsession->priv->ntp_time_source ==
+              GST_RTP_NTP_TIME_SOURCE_NTP)
+            ntpnstime += (2208988800LL * GST_SECOND);
+          break;
+        }
+        case GST_RTP_NTP_TIME_SOURCE_RUNNING_TIME:
+          ntpnstime = running_time;
+          break;
+        case GST_RTP_NTP_TIME_SOURCE_CLOCK_TIME:
+          ntpnstime = running_time + base_time;
+          break;
+        default:
+          ntpnstime = -1;
+          g_assert_not_reached ();
+          break;
+      }
+    }
+
+    gst_object_unref (clock);
+  } else {
+    if (!GST_ELEMENT_CLOCK (rtpsession)) {
+      GST_WARNING_OBJECT (rtpsession,
+          "Don't have a clock yet and can't determine NTP time for this packet");
+    }
+    GST_OBJECT_UNLOCK (rtpsession);
+    ntpnstime = GST_CLOCK_TIME_NONE;
+  }
+
   ret = rtp_session_send_rtp (priv->session, data, is_list, current_time,
-      running_time);
+      running_time, ntpnstime);
   if (ret != GST_FLOW_OK)
     goto push_error;
 
index 80b788a..fdb17be 100644 (file)
@@ -2118,6 +2118,25 @@ update_packet (GstBuffer ** buffer, guint idx, RTPPacketInfo * pinfo)
       pinfo->header_ext = gst_rtp_buffer_get_extension_bytes (&rtp,
           &pinfo->header_ext_bit_pattern);
     }
+
+    if (pinfo->ntp64_ext_id != 0 && pinfo->send && !pinfo->have_ntp64_ext) {
+      guint8 *data;
+      guint size;
+
+      /* Remember here that there is a 64-bit NTP header extension on this buffer
+       * or any of the other buffers in the buffer list.
+       * Later we update this after making the buffer(list) writable.
+       */
+      if ((gst_rtp_buffer_get_extension_onebyte_header (&rtp,
+                  pinfo->ntp64_ext_id, 0, (gpointer *) & data, &size)
+              && size == 8)
+          || (gst_rtp_buffer_get_extension_twobytes_header (&rtp, NULL,
+                  pinfo->ntp64_ext_id, 0, (gpointer *) & data, &size)
+              && size == 8)) {
+        pinfo->have_ntp64_ext = TRUE;
+      }
+    }
+
     gst_rtp_buffer_unmap (&rtp);
   }
 
@@ -2166,6 +2185,8 @@ update_packet_info (RTPSession * sess, RTPPacketInfo * pinfo,
   pinfo->payload_len = 0;
   pinfo->packets = 0;
   pinfo->marker = FALSE;
+  pinfo->ntp64_ext_id = send ? sess->send_ntp64_ext_id : 0;
+  pinfo->have_ntp64_ext = FALSE;
 
   if (is_list) {
     GstBufferList *list = GST_BUFFER_LIST_CAST (data);
@@ -3125,6 +3146,29 @@ invalid_packet:
   }
 }
 
+static guint8
+_get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name)
+{
+  guint i;
+  guint8 extmap_id = 0;
+  guint n_fields = gst_structure_n_fields (s);
+
+  for (i = 0; i < n_fields; i++) {
+    const gchar *field_name = gst_structure_nth_field_name (s, i);
+    if (g_str_has_prefix (field_name, "extmap-")) {
+      const gchar *str = gst_structure_get_string (s, field_name);
+      if (str && g_strcmp0 (str, ext_name) == 0) {
+        gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10);
+        if (id > 0 && id < 15) {
+          extmap_id = id;
+          break;
+        }
+      }
+    }
+  }
+  return extmap_id;
+}
+
 /**
  * rtp_session_update_send_caps:
  * @sess: an #RTPSession
@@ -3180,9 +3224,151 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
     sess->internal_ssrc_from_caps_or_property = FALSE;
   }
 
+  sess->send_ntp64_ext_id =
+      _get_extmap_id_for_attribute (s,
+      GST_RTP_HDREXT_BASE GST_RTP_HDREXT_NTP_64);
+
   rtp_twcc_manager_parse_send_ext_id (sess->twcc, s);
 }
 
+static void
+update_ntp64_header_ext_data (RTPPacketInfo * pinfo, GstBuffer * buffer)
+{
+  GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+
+  if (gst_rtp_buffer_map (buffer, GST_MAP_READWRITE, &rtp)) {
+    guint16 bits;
+    guint8 *data;
+    guint wordlen;
+
+    if (gst_rtp_buffer_get_extension_data (&rtp, &bits, (gpointer *) & data,
+            &wordlen)) {
+      gsize len = wordlen * 4;
+
+      /* One-byte header */
+      if (bits == 0xBEDE) {
+        /* One-byte header extension */
+        while (TRUE) {
+          guint8 ext_id, ext_len;
+
+          if (len < 1)
+            break;
+
+          ext_id = GST_READ_UINT8 (data) >> 4;
+          ext_len = (GST_READ_UINT8 (data) & 0xF) + 1;
+          data += 1;
+          len -= 1;
+          if (ext_id == 0) {
+            /* Skip padding */
+            continue;
+          } else if (ext_id == 15) {
+            /* Stop parsing */
+            break;
+          }
+
+          /* extension doesn't fit into the header */
+          if (ext_len > len)
+            break;
+
+          if (ext_id == pinfo->ntp64_ext_id && ext_len == 8) {
+            if (pinfo->ntpnstime != GST_CLOCK_TIME_NONE) {
+              guint64 ntptime = gst_util_uint64_scale (pinfo->ntpnstime,
+                  G_GUINT64_CONSTANT (1) << 32,
+                  GST_SECOND);
+
+              GST_WRITE_UINT64_BE (data, ntptime);
+            } else {
+              /* Replace extension with padding */
+              memset (data - 1, 0, 1 + ext_len);
+            }
+          }
+
+          /* skip to the next extension */
+          data += ext_len;
+          len -= ext_len;
+        }
+      } else if ((bits >> 4) == 0x100) {
+        /* Two-byte header extension */
+
+        while (TRUE) {
+          guint8 ext_id, ext_len;
+
+          if (len < 1)
+            break;
+
+          ext_id = GST_READ_UINT8 (data);
+          data += 1;
+          len -= 1;
+          if (ext_id == 0) {
+            /* Skip padding */
+            continue;
+          }
+
+          ext_len = GST_READ_UINT8 (data);
+          data += 1;
+          len -= 1;
+
+          /* extension doesn't fit into the header */
+          if (ext_len > len)
+            break;
+
+          if (ext_id == pinfo->ntp64_ext_id && ext_len == 8) {
+            if (pinfo->ntpnstime != GST_CLOCK_TIME_NONE) {
+              guint64 ntptime = gst_util_uint64_scale (pinfo->ntpnstime,
+                  G_GUINT64_CONSTANT (1) << 32,
+                  GST_SECOND);
+
+              GST_WRITE_UINT64_BE (data, ntptime);
+            } else {
+              /* Replace extension with padding */
+              memset (data - 2, 0, 2 + ext_len);
+            }
+          }
+
+          /* skip to the next extension */
+          data += ext_len;
+          len -= ext_len;
+        }
+      }
+    }
+    gst_rtp_buffer_unmap (&rtp);
+  }
+}
+
+static void
+update_ntp64_header_ext (RTPPacketInfo * pinfo)
+{
+  /* Early return if we don't know the header extension id or the packets
+   * don't contain the header extension */
+  if (pinfo->ntp64_ext_id == 0 || !pinfo->have_ntp64_ext)
+    return;
+
+  /* If no NTP time is known then the header extension will be replaced with
+   * padding, otherwise it will be updated */
+  GST_TRACE
+      ("Updating NTP-64 header extension for SSRC %08x packet with RTP time %u and running time %"
+      GST_TIME_FORMAT " to %" GST_TIME_FORMAT, pinfo->ssrc, pinfo->rtptime,
+      GST_TIME_ARGS (pinfo->running_time), GST_TIME_ARGS (pinfo->ntpnstime));
+
+  if (GST_IS_BUFFER_LIST (pinfo->data)) {
+    GstBufferList *list;
+    guint i = 0;
+
+    pinfo->data = gst_buffer_list_make_writable (pinfo->data);
+
+    list = GST_BUFFER_LIST (pinfo->data);
+
+    for (i = 0; i < gst_buffer_list_length (list); i++) {
+      GstBuffer *buffer = gst_buffer_list_get_writable (list, i);
+
+      update_ntp64_header_ext_data (pinfo, buffer);
+    }
+  } else {
+    pinfo->data = gst_buffer_make_writable (pinfo->data);
+    update_ntp64_header_ext_data (pinfo, pinfo->data);
+  }
+}
+
 /**
  * rtp_session_send_rtp:
  * @sess: an #RTPSession
@@ -3198,7 +3384,7 @@ rtp_session_update_send_caps (RTPSession * sess, GstCaps * caps)
  */
 GstFlowReturn
 rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
-    GstClockTime current_time, GstClockTime running_time)
+    GstClockTime current_time, GstClockTime running_time, guint64 ntpnstime)
 {
   GstFlowReturn result;
   RTPSource *source;
@@ -3214,9 +3400,11 @@ rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list,
 
   RTP_SESSION_LOCK (sess);
   if (!update_packet_info (sess, &pinfo, TRUE, TRUE, is_list, data,
-          current_time, running_time, -1))
+          current_time, running_time, ntpnstime))
     goto invalid_packet;
 
+  /* Update any 64-bit NTP header extensions with the actual NTP time here */
+  update_ntp64_header_ext (&pinfo);
   rtp_twcc_manager_send_packet (sess->twcc, &pinfo);
 
   source = obtain_internal_source (sess, pinfo.ssrc, &created, current_time);
index 4c6d18c..d68aa3c 100644 (file)
@@ -309,6 +309,9 @@ struct _RTPSession {
 
   gboolean timestamp_sender_reports;
 
+  /* RFC6051 64-bit NTP header extension */
+  guint8 send_ntp64_ext_id;
+
   /* Transport-wide cc-extension */
   RTPTWCCManager *twcc;
   RTPTWCCStats *twcc_stats;
@@ -410,7 +413,8 @@ GstFlowReturn   rtp_session_process_rtcp           (RTPSession *sess, GstBuffer
 /* processing packets for sending */
 void            rtp_session_update_send_caps       (RTPSession *sess, GstCaps *caps);
 GstFlowReturn   rtp_session_send_rtp               (RTPSession *sess, gpointer data, gboolean is_list,
-                                                    GstClockTime current_time, GstClockTime running_time);
+                                                    GstClockTime current_time, GstClockTime running_time,
+                                                    guint64 ntpnstime);
 
 /* scheduling bye */
 void            rtp_session_mark_all_bye           (RTPSession *sess, const gchar *reason);
index 27f80cd..45ad377 100644 (file)
@@ -83,6 +83,9 @@ typedef struct {
  * @csrcs: CSRCs
  * @header_ext: Header extension data
  * @header_ext_bit_pattern: Header extension bit pattern
+ * @ntp64_ext_id: Extension header ID for RFC6051 64-bit NTP timestamp.
+ * @have_ntp64_ext: If there is at least one 64-bit NTP timestamp header
+ *     extension.
  *
  * Structure holding information about the packet.
  */
@@ -109,6 +112,8 @@ typedef struct {
   guint32       csrcs[16];
   GBytes        *header_ext;
   guint16       header_ext_bit_pattern;
+  guint8        ntp64_ext_id;
+  gboolean      have_ntp64_ext;
 } RTPPacketInfo;
 
 /**