mpegtsmux: Improve PCR/SI scheduling.
authorJan Schmidt <jan@centricular.com>
Thu, 11 Mar 2021 07:05:25 +0000 (18:05 +1100)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Thu, 18 Mar 2021 13:57:27 +0000 (13:57 +0000)
Change PCR / SI scheduling so that instead of checking if
the current PCR is larger than the next target time, instead
check if the PCR of the next packet would be too late, so PCR
and SI are always scheduled earlier than the target, not later.

There are still cases where PCR can be written too late though,
because we don't check before each output packet.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/2073>

gst/mpegtsmux/tsmux/tsmux.c

index 647d6e5..5a13106 100644 (file)
 #define TSMUX_DEFAULT_NETWORK_ID 0x0001
 #define TSMUX_DEFAULT_TS_ID 0x0001
 
+/* The last byte of the PCR in the header defines the byte position
+ * at which PCR should be calculated */
+#define PCR_BYTE_OFFSET 11
+
 /* HACK: We use a fixed buffering offset for the PCR at the moment -
  * this is the amount 'in advance' of the stream that the PCR sits.
  * 1/8 second atm */
@@ -1248,6 +1252,7 @@ ts_to_pcr (gint64 ts)
   return (ts - TSMUX_PCR_OFFSET) * (TSMUX_SYS_CLOCK_FREQ / TSMUX_CLOCK_FREQ);
 }
 
+/* Calculate the PCR to write into the current packet */
 static gint64
 get_current_pcr (TsMux * mux, gint64 cur_ts)
 {
@@ -1257,25 +1262,48 @@ get_current_pcr (TsMux * mux, gint64 cur_ts)
   if (mux->first_pcr_ts == G_MININT64) {
     g_assert (cur_ts != G_MININT64);
     mux->first_pcr_ts = cur_ts;
+    GST_DEBUG ("First PCR offset is %" G_GUINT64_FORMAT, cur_ts);
   }
 
   return ts_to_pcr (mux->first_pcr_ts) +
-      gst_util_uint64_scale (mux->n_bytes * 8, TSMUX_SYS_CLOCK_FREQ,
-      mux->bitrate);
+      gst_util_uint64_scale ((mux->n_bytes + PCR_BYTE_OFFSET) * 8,
+      TSMUX_SYS_CLOCK_FREQ, mux->bitrate);
 }
 
+/* Predict the PCR at the next packet if possible */
 static gint64
-write_new_pcr (TsMux * mux, TsMuxStream * stream, gint64 cur_pcr)
+get_next_pcr (TsMux * mux, gint64 cur_ts)
 {
-  if (stream->next_pcr == -1 || cur_pcr > stream->next_pcr) {
+  if (!mux->bitrate)
+    return ts_to_pcr (cur_ts);
+
+  if (mux->first_pcr_ts == G_MININT64) {
+    g_assert (cur_ts != G_MININT64);
+    mux->first_pcr_ts = cur_ts;
+    GST_DEBUG ("First PCR offset is %" G_GUINT64_FORMAT, cur_ts);
+  }
+
+  return ts_to_pcr (mux->first_pcr_ts) +
+      gst_util_uint64_scale ((mux->n_bytes + TSMUX_PACKET_LENGTH +
+          PCR_BYTE_OFFSET) * 8, TSMUX_SYS_CLOCK_FREQ, mux->bitrate);
+}
+
+static gint64
+write_new_pcr (TsMux * mux, TsMuxStream * stream, gint64 cur_pcr,
+    gint64 next_pcr)
+{
+  if (stream->next_pcr == -1 || next_pcr > stream->next_pcr) {
     stream->pi.flags |=
         TSMUX_PACKET_FLAG_ADAPTATION | TSMUX_PACKET_FLAG_WRITE_PCR;
     stream->pi.pcr = cur_pcr;
 
-    if (stream->next_pcr == -1)
-      stream->next_pcr = cur_pcr + mux->pcr_interval * 300;
-    else
-      stream->next_pcr += mux->pcr_interval * 300;
+    if (stream->next_pcr != -1 && cur_pcr >= stream->next_pcr) {
+      GST_WARNING ("Writing PCR %" G_GUINT64_FORMAT " missed the target %"
+          G_GUINT64_FORMAT " by %f ms", cur_pcr, stream->next_pcr,
+          (double) (cur_pcr - stream->next_pcr) / 27000.0);
+    }
+    /* Next PCR deadline is now plus the scheduled interval */
+    stream->next_pcr = cur_pcr + mux->pcr_interval * 300;
   } else {
     cur_pcr = -1;
   }
@@ -1289,48 +1317,48 @@ rewrite_si (TsMux * mux, gint64 cur_ts)
   gboolean write_pat;
   gboolean write_si;
   GList *cur;
-  gint64 cur_pcr;
+  gint64 next_pcr;
 
-  cur_pcr = get_current_pcr (mux, cur_ts);
+  next_pcr = get_next_pcr (mux, cur_ts);
 
   /* check if we need to rewrite pat */
   if (mux->next_pat_pcr == -1 || mux->pat_changed)
     write_pat = TRUE;
-  else if (cur_pcr > mux->next_pat_pcr)
+  else if (next_pcr > mux->next_pat_pcr)
     write_pat = TRUE;
   else
     write_pat = FALSE;
 
   if (write_pat) {
     if (mux->next_pat_pcr == -1)
-      mux->next_pat_pcr = cur_pcr + mux->pat_interval * 300;
+      mux->next_pat_pcr = next_pcr + mux->pat_interval * 300;
     else
       mux->next_pat_pcr += mux->pat_interval * 300;
 
     if (!tsmux_write_pat (mux))
       return FALSE;
 
-    cur_pcr = get_current_pcr (mux, cur_ts);
+    next_pcr = get_next_pcr (mux, cur_ts);
   }
 
   /* check if we need to rewrite sit */
   if (mux->next_si_pcr == -1 || mux->si_changed)
     write_si = TRUE;
-  else if (cur_pcr > mux->next_si_pcr)
+  else if (next_pcr > mux->next_si_pcr)
     write_si = TRUE;
   else
     write_si = FALSE;
 
   if (write_si) {
     if (mux->next_si_pcr == -1)
-      mux->next_si_pcr = cur_pcr + mux->si_interval * 300;
+      mux->next_si_pcr = next_pcr + mux->si_interval * 300;
     else
       mux->next_si_pcr += mux->si_interval * 300;
 
     if (!tsmux_write_si (mux))
       return FALSE;
 
-    cur_pcr = get_current_pcr (mux, cur_ts);
+    next_pcr = get_current_pcr (mux, cur_ts);
   }
 
   /* check if we need to rewrite any of the current pmts */
@@ -1340,28 +1368,28 @@ rewrite_si (TsMux * mux, gint64 cur_ts)
 
     if (program->next_pmt_pcr == -1 || program->pmt_changed)
       write_pmt = TRUE;
-    else if (cur_pcr > program->next_pmt_pcr)
+    else if (next_pcr > program->next_pmt_pcr)
       write_pmt = TRUE;
     else
       write_pmt = FALSE;
 
     if (write_pmt) {
       if (program->next_pmt_pcr == -1)
-        program->next_pmt_pcr = cur_pcr + program->pmt_interval * 300;
+        program->next_pmt_pcr = next_pcr + program->pmt_interval * 300;
       else
         program->next_pmt_pcr += program->pmt_interval * 300;
 
       if (!tsmux_write_pmt (mux, program))
         return FALSE;
 
-      cur_pcr = get_current_pcr (mux, cur_ts);
+      next_pcr = get_current_pcr (mux, cur_ts);
     }
 
     if (program->scte35_pid != 0) {
       gboolean write_scte_null = FALSE;
       if (program->next_scte35_pcr == -1)
         write_scte_null = TRUE;
-      else if (cur_pcr > program->next_scte35_pcr)
+      else if (next_pcr > program->next_scte35_pcr)
         write_scte_null = TRUE;
 
       if (write_scte_null) {
@@ -1369,7 +1397,7 @@ rewrite_si (TsMux * mux, gint64 cur_ts)
             program->next_scte35_pcr);
         if (program->next_scte35_pcr == -1)
           program->next_scte35_pcr =
-              cur_pcr + program->scte35_null_interval * 300;
+              next_pcr + program->scte35_null_interval * 300;
         else
           program->next_scte35_pcr += program->scte35_null_interval * 300;
         GST_DEBUG ("next scte35 NOW pcr %" G_GINT64_FORMAT,
@@ -1378,7 +1406,7 @@ rewrite_si (TsMux * mux, gint64 cur_ts)
         if (!tsmux_write_scte_null (mux, program))
           return FALSE;
 
-        cur_pcr = get_current_pcr (mux, cur_ts);
+        next_pcr = get_current_pcr (mux, cur_ts);
       }
     }
   }
@@ -1393,66 +1421,73 @@ pad_stream (TsMux * mux, TsMuxStream * stream, gint64 cur_ts)
   GstBuffer *buf = NULL;
   GstMapInfo map;
   gboolean ret = TRUE;
+  GstClockTimeDiff diff;
+  guint64 start_n_bytes;
 
   if (!mux->bitrate)
     goto done;
 
-  do {
-    if (GST_CLOCK_STIME_IS_VALID (cur_ts)) {
-      GstClockTimeDiff diff;
-
-      if (!GST_CLOCK_STIME_IS_VALID (stream->first_ts))
-        stream->first_ts = cur_ts;
-
-      diff = GST_CLOCK_DIFF (stream->first_ts, cur_ts);
-
-      if (diff) {
-        bitrate =
-            gst_util_uint64_scale (mux->n_bytes * 8, TSMUX_CLOCK_FREQ, diff);
-
-        GST_LOG ("Transport stream bitrate: %" G_GUINT64_FORMAT, bitrate);
-
-        if (bitrate < mux->bitrate) {
-          gint64 new_pcr;
-          guint payload_len, payload_offs;
+  if (!GST_CLOCK_STIME_IS_VALID (cur_ts))
+    goto done;
 
-          GST_LOG ("Padding transport stream");
+  if (!GST_CLOCK_STIME_IS_VALID (stream->first_ts))
+    stream->first_ts = cur_ts;
 
-          if (!rewrite_si (mux, cur_ts)) {
-            ret = FALSE;
-            goto done;
-          }
+  diff = GST_CLOCK_DIFF (stream->first_ts, cur_ts);
+  if (diff == 0)
+    goto done;
 
-          if (!tsmux_get_buffer (mux, &buf)) {
-            ret = FALSE;
-            goto done;
-          }
+  start_n_bytes = mux->n_bytes;
+  do {
+    GST_LOG ("Transport stream bitrate: %" G_GUINT64_FORMAT " over %"
+        G_GUINT64_FORMAT " bytes, duration %" GST_TIME_FORMAT,
+        gst_util_uint64_scale (mux->n_bytes * 8, TSMUX_CLOCK_FREQ, diff),
+        mux->n_bytes, GST_TIME_ARGS (diff * GST_SECOND / TSMUX_CLOCK_FREQ));
+
+    /* calculate what the overall bitrate will be if we add 1 more packet */
+    bitrate =
+        gst_util_uint64_scale ((mux->n_bytes + TSMUX_PACKET_LENGTH) * 8,
+        TSMUX_CLOCK_FREQ, diff);
+
+    if (bitrate <= mux->bitrate) {
+      gint64 new_pcr;
+      guint payload_len, payload_offs;
+
+      if (!tsmux_get_buffer (mux, &buf)) {
+        ret = FALSE;
+        goto done;
+      }
 
-          gst_buffer_map (buf, &map, GST_MAP_READ);
+      gst_buffer_map (buf, &map, GST_MAP_READ);
 
-          if ((new_pcr =
-                  write_new_pcr (mux, stream, get_current_pcr (mux,
-                          cur_ts)) != -1))
-            tsmux_write_ts_header (mux, map.data, &stream->pi, &payload_len,
-                &payload_offs, 0);
-          else
-            tsmux_write_null_ts_header (map.data);
+      if ((new_pcr =
+              write_new_pcr (mux, stream, get_current_pcr (mux,
+                      cur_ts), get_next_pcr (mux, cur_ts)) != -1)) {
+        GST_LOG ("Writing PCR-only packet on PID 0x%04x", stream->pi.pid);
+        tsmux_write_ts_header (mux, map.data, &stream->pi, &payload_len,
+            &payload_offs, 0);
+      } else {
+        GST_LOG ("Writing null stuffing packet");
+        if (!rewrite_si (mux, cur_ts)) {
+          ret = FALSE;
+          goto done;
+        }
+        tsmux_write_null_ts_header (map.data);
+      }
 
-          gst_buffer_unmap (buf, &map);
+      gst_buffer_unmap (buf, &map);
 
-          stream->pi.flags &= TSMUX_PACKET_FLAG_PES_FULL_HEADER;
+      stream->pi.flags &= TSMUX_PACKET_FLAG_PES_FULL_HEADER;
 
-          if (!(ret = tsmux_packet_out (mux, buf, new_pcr)))
-            goto done;
-        }
-      } else {
-        break;
-      }
-    } else {
-      break;
+      if (!(ret = tsmux_packet_out (mux, buf, new_pcr)))
+        goto done;
     }
   } while (bitrate < mux->bitrate);
 
+  if (mux->n_bytes != start_n_bytes) {
+    GST_LOG ("Finished padding the mux");
+  }
+
 done:
   return ret;
 }
@@ -1481,7 +1516,6 @@ tsmux_write_stream_packet (TsMux * mux, TsMuxStream * stream)
 
   if (tsmux_stream_is_pcr (stream)) {
     gint64 cur_ts = CLOCK_BASE;
-
     if (tsmux_stream_get_dts (stream) != G_MININT64)
       cur_ts += tsmux_stream_get_dts (stream);
     else
@@ -1493,7 +1527,9 @@ tsmux_write_stream_packet (TsMux * mux, TsMuxStream * stream)
     if (!pad_stream (mux, stream, cur_ts))
       goto fail;
 
-    new_pcr = write_new_pcr (mux, stream, get_current_pcr (mux, cur_ts));
+    new_pcr =
+        write_new_pcr (mux, stream, get_current_pcr (mux, cur_ts),
+        get_next_pcr (mux, cur_ts));
   }
 
   pi->packet_start_unit_indicator = tsmux_stream_at_pes_start (stream);