srtsrc: Fix timestamping
authorEdward Hervey <edward@centricular.com>
Tue, 6 Oct 2020 09:45:36 +0000 (11:45 +0200)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Thu, 8 Oct 2020 21:12:17 +0000 (21:12 +0000)
SRT provides the original timestamp of a packet (with drift/skew corrected for
local clock), which is what should be used for timestamping the outgoing
buffers. This ensures that we output the packets with the same timestamp (and by
extension rate) as the original feed.

Also detect if packets were dropped (by checking the sequence number) and
properly set DISCONT flag on the outgoing buffer.

Finally answer the latency queries

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1658>

ext/srt/gstsrtobject.c
ext/srt/gstsrtobject.h
ext/srt/gstsrtsrc.c
ext/srt/gstsrtsrc.h

index 3cdd1c1aa425ee0f87685aa361801ca19cc2c1b7..6a0bd5a790a8fe08e0eb9baaa5dd5df3975af2f9 100644 (file)
@@ -1247,7 +1247,8 @@ gst_srt_object_wait_caller (GstSRTObject * srtobject,
 
 gssize
 gst_srt_object_read (GstSRTObject * srtobject,
-    guint8 * data, gsize size, GCancellable * cancellable, GError ** error)
+    guint8 * data, gsize size, GCancellable * cancellable, GError ** error,
+    SRT_MSGCTRL * mctrl)
 {
   gssize len = 0;
   gint poll_timeout;
@@ -1335,7 +1336,8 @@ gst_srt_object_read (GstSRTObject * srtobject,
     }
 
 
-    len = srt_recvmsg (rsock, (char *) (data), size);
+    srt_msgctrl_init (mctrl);
+    len = srt_recvmsg2 (rsock, (char *) (data), size, mctrl);
 
     if (len == SRT_ERROR) {
       gint srt_errno = srt_getlasterror (NULL);
index 302aa89cc86fda52608a48bb6dfd488c974ebc0c..09950feaf271a5c41676eeb874fe6a5a7a46f324 100644 (file)
@@ -98,7 +98,8 @@ gboolean        gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar *u
 gssize          gst_srt_object_read     (GstSRTObject * srtobject,
                                          guint8 *data, gsize size,
                                          GCancellable *cancellable,
-                                         GError **err);
+                                         GError **err,
+                                        SRT_MSGCTRL *mctrl);
 
 gssize          gst_srt_object_write    (GstSRTObject * srtobject,
                                          GstBufferList * headers,
index 2b898ea79a32783defe0a104b2217a0e7e06ab84..c886e39f32653e83ea766243a37d4066ce17b2a8 100644 (file)
@@ -97,6 +97,9 @@ gst_srt_src_start (GstBaseSrc * bsrc)
     g_clear_error (&error);
   }
 
+  /* Reset expected pktseq */
+  self->next_pktseq = 0;
+
   return ret;
 }
 
@@ -118,6 +121,11 @@ gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf)
   GstMapInfo info;
   GError *err = NULL;
   gssize recv_len;
+  GstClock *clock;
+  GstClockTime base_time;
+  GstClockTime capture_time;
+  GstClockTime delay;
+  SRT_MSGCTRL mctrl;
 
   if (g_cancellable_is_cancelled (self->cancellable)) {
     ret = GST_FLOW_FLUSHING;
@@ -130,11 +138,30 @@ gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf)
     goto out;
   }
 
+  /* Get clock and values */
+  clock = gst_element_get_clock (GST_ELEMENT (src));
+  base_time = gst_element_get_base_time (GST_ELEMENT (src));
+
   recv_len = gst_srt_object_read (self->srtobject, info.data,
-      gst_buffer_get_size (outbuf), self->cancellable, &err);
+      gst_buffer_get_size (outbuf), self->cancellable, &err, &mctrl);
+
+  /* Capture clock values ASAP */
+  capture_time = gst_clock_get_time (clock);
+#if SRT_VERSION_VALUE >= 0x10402
+  /* Use SRT clock value if available (SRT > 1.4.2) */
+  delay = (srt_time_now () - mctrl.srctime) * GST_USECOND;
+#else
+  /* Else use the unix epoch monotonic clock */
+  delay = (g_get_real_time () - mctrl.srctime) * GST_USECOND;
+#endif
+  gst_object_unref (clock);
 
   gst_buffer_unmap (outbuf, &info);
 
+  GST_LOG_OBJECT (src,
+      "recv_len:%" G_GSIZE_FORMAT " pktseq:%d msgno:%d srctime:%"
+      G_GUINT64_FORMAT, recv_len, mctrl.pktseq, mctrl.msgno, mctrl.srctime);
+
   if (g_cancellable_is_cancelled (self->cancellable)) {
     ret = GST_FLOW_FLUSHING;
     goto out;
@@ -150,6 +177,29 @@ gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf)
     goto out;
   }
 
+  /* Detect discontinuities */
+  if (mctrl.pktseq != self->next_pktseq) {
+    GST_WARNING_OBJECT (src, "discont detected %d (expected: %d)",
+        mctrl.pktseq, self->next_pktseq);
+    GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+  }
+  /* pktseq is a 31bit field */
+  self->next_pktseq = (mctrl.pktseq + 1) % G_MAXINT32;
+
+  /* Subtract the base_time (since the pipeline started) ... */
+  if (capture_time > base_time)
+    capture_time -= base_time;
+  else
+    capture_time = 0;
+  /* And adjust by the delay */
+  if (capture_time > delay)
+    capture_time -= delay;
+  else
+    capture_time = 0;
+  GST_BUFFER_TIMESTAMP (outbuf) = capture_time;
+
+  GST_DEBUG_OBJECT (src, "delay:%" GST_TIME_FORMAT, GST_TIME_ARGS (delay));
+
   gst_buffer_resize (outbuf, 0, recv_len);
 
   GST_LOG_OBJECT (src,
@@ -173,7 +223,8 @@ gst_srt_src_init (GstSRTSrc * self)
 
   gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME);
   gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
-  gst_base_src_set_do_timestamp (GST_BASE_SRC (self), TRUE);
+  /* We do the timing ourselves */
+  gst_base_src_set_do_timestamp (GST_BASE_SRC (self), FALSE);
 
   gst_srt_object_set_uri (self->srtobject, GST_SRT_DEFAULT_URI, NULL);
 
@@ -234,6 +285,24 @@ gst_srt_src_get_property (GObject * object,
   }
 }
 
+static gboolean
+gst_srt_src_query (GstBaseSrc * basesrc, GstQuery * query)
+{
+  GstSRTSrc *self = GST_SRT_SRC (basesrc);
+
+  if (GST_QUERY_TYPE (query) == GST_QUERY_LATENCY) {
+    gint latency;
+    if (!gst_structure_get_int (self->srtobject->parameters, "latency",
+            &latency))
+      latency = GST_SRT_DEFAULT_LATENCY;
+    gst_query_set_latency (query, TRUE, latency * GST_MSECOND,
+        latency * GST_MSECOND);
+    return TRUE;
+  } else {
+    return GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
+  }
+}
+
 static void
 gst_srt_src_class_init (GstSRTSrcClass * klass)
 {
@@ -285,6 +354,7 @@ gst_srt_src_class_init (GstSRTSrcClass * klass)
   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_srt_src_stop);
   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_srt_src_unlock);
   gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_srt_src_unlock_stop);
+  gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_srt_src_query);
 
   gstpushsrc_class->fill = GST_DEBUG_FUNCPTR (gst_srt_src_fill);
 }
index af0b83362eb3bb47a568c7d41cfa10b97246d713..0ca9a4b84a240c8b924af15a2ead31fd2408010c 100644 (file)
@@ -49,6 +49,8 @@ struct _GstSRTSrc {
 
   GstSRTObject *srtobject;
   GCancellable *cancellable;
+
+  guint32       next_pktseq;
 };
 
 struct _GstSRTSrcClass {