qtmux: support non-seekable downstream mode
authorMatthew Waters <matthew@centricular.com>
Wed, 22 Jul 2020 05:34:44 +0000 (15:34 +1000)
committerMatthew Waters <matthew@centricular.com>
Mon, 28 Sep 2020 05:37:12 +0000 (15:37 +1000)
Write an mdat per buffer in that case.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/732>

gst/isomp4/gstqtmux.c
gst/isomp4/gstqtmux.h

index f8ef037..c2f1474 100644 (file)
@@ -398,6 +398,8 @@ gst_qt_mux_robust_recording_rewrite_moov (GstQTMux * qtmux);
 static void gst_qt_mux_update_global_statistics (GstQTMux * qtmux);
 static void gst_qt_mux_update_edit_lists (GstQTMux * qtmux);
 
+static GstFlowReturn gst_qtmux_push_mdat_stored_buffers (GstQTMux * qtmux);
+
 static GstElementClass *parent_class = NULL;
 
 static void
@@ -761,6 +763,9 @@ gst_qt_mux_reset (GstQTMux * qtmux, gboolean alloc)
   }
   GST_OBJECT_UNLOCK (qtmux);
 
+  g_list_free_full (qtmux->output_buffers, (GDestroyNotify) gst_buffer_unref);
+  qtmux->output_buffers = NULL;
+
   qtmux->current_pad = NULL;
   qtmux->current_chunk_size = 0;
   qtmux->current_chunk_duration = 0;
@@ -1867,7 +1872,7 @@ static GstFlowReturn
 gst_qt_mux_send_buffer (GstQTMux * qtmux, GstBuffer * buf, guint64 * offset,
     gboolean mind_fast)
 {
-  GstFlowReturn res;
+  GstFlowReturn res = GST_FLOW_OK;
   gsize size;
 
   g_return_val_if_fail (buf != NULL, GST_FLOW_ERROR);
@@ -1889,8 +1894,14 @@ gst_qt_mux_send_buffer (GstQTMux * qtmux, GstBuffer * buf, guint64 * offset,
     else
       res = GST_FLOW_OK;
   } else {
-    GST_LOG_OBJECT (qtmux, "downstream");
-    res = gst_aggregator_finish_buffer (GST_AGGREGATOR (qtmux), buf);
+    if (!mind_fast) {
+      res = gst_qtmux_push_mdat_stored_buffers (qtmux);
+    }
+
+    if (res == GST_FLOW_OK) {
+      GST_LOG_OBJECT (qtmux, "downstream");
+      res = gst_aggregator_finish_buffer (GST_AGGREGATOR (qtmux), buf);
+    }
   }
 
   if (res != GST_FLOW_OK)
@@ -2010,6 +2021,7 @@ gst_qt_mux_send_mdat_header (GstQTMux * qtmux, guint64 * off, guint64 size,
 {
   GstBuffer *buf;
   GstMapInfo map;
+  gboolean mind_fast = FALSE;
 
   GST_DEBUG_OBJECT (qtmux, "Sending mdat's atom header, "
       "size %" G_GUINT64_FORMAT, size);
@@ -2053,8 +2065,10 @@ gst_qt_mux_send_mdat_header (GstQTMux * qtmux, guint64 * off, guint64 size,
   if (fsync_after)
     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_SYNC_AFTER);
 
-  return gst_qt_mux_send_buffer (qtmux, buf, off, FALSE);
+  mind_fast = qtmux->mux_mode == GST_QT_MUX_MODE_MOOV_AT_END
+      && !qtmux->downstream_seekable;
 
+  return gst_qt_mux_send_buffer (qtmux, buf, off, mind_fast);
 }
 
 /*
@@ -3033,14 +3047,16 @@ gst_qt_mux_start_file (GstQTMux * qtmux)
       qtmux->mux_mode = GST_QT_MUX_MODE_ROBUST_RECORDING;
   }
 
+  qtmux->downstream_seekable = gst_qt_mux_downstream_is_seekable (qtmux);
   switch (qtmux->mux_mode) {
     case GST_QT_MUX_MODE_MOOV_AT_END:
+      break;
     case GST_QT_MUX_MODE_ROBUST_RECORDING:
       /* We have to be able to seek to rewrite the mdat header, or any
        * moov atom we write will not be visible in the file, because an
        * MDAT with 0 as the size covers the rest of the file. A file
        * with no moov is not playable, so error out now. */
-      if (!gst_qt_mux_downstream_is_seekable (qtmux)) {
+      if (!qtmux->downstream_seekable) {
         GST_ELEMENT_ERROR (qtmux, STREAM, MUX,
             ("Downstream is not seekable - will not be able to create a playable file"),
             (NULL));
@@ -3056,7 +3072,7 @@ gst_qt_mux_start_file (GstQTMux * qtmux)
     case GST_QT_MUX_MODE_FRAGMENTED:
       if (qtmux->fragment_mode == GST_QT_MUX_FRAGMENT_STREAMABLE)
         break;
-      if (!gst_qt_mux_downstream_is_seekable (qtmux)) {
+      if (!qtmux->downstream_seekable) {
         GST_WARNING_OBJECT (qtmux, "downstream is not seekable, but "
             "streamable=false. Will ignore that and create streamable output "
             "instead");
@@ -3065,7 +3081,7 @@ gst_qt_mux_start_file (GstQTMux * qtmux)
       }
       break;
     case GST_QT_MUX_MODE_ROBUST_RECORDING_PREFILL:
-      if (!gst_qt_mux_downstream_is_seekable (qtmux)) {
+      if (!qtmux->downstream_seekable) {
         GST_WARNING_OBJECT (qtmux,
             "downstream is not seekable, will not be able "
             "to trim samples table at the end if less than reserved-duration is "
@@ -3176,9 +3192,10 @@ gst_qt_mux_start_file (GstQTMux * qtmux)
       qtmux->mdat_pos = qtmux->header_size;
       /* extended atom in case we go over 4GB while writing and need
        * the full 64-bit atom */
-      ret =
-          gst_qt_mux_send_mdat_header (qtmux, &qtmux->header_size, 0, TRUE,
-          FALSE);
+      if (qtmux->downstream_seekable)
+        ret =
+            gst_qt_mux_send_mdat_header (qtmux, &qtmux->header_size, 0, TRUE,
+            FALSE);
       break;
     case GST_QT_MUX_MODE_ROBUST_RECORDING:
       ret = gst_qt_mux_prepare_and_send_ftyp (qtmux);
@@ -3694,6 +3711,64 @@ gst_qt_mux_update_timecode (GstQTMux * qtmux, GstQTMuxPad * qtpad)
   return gst_qt_mux_send_buffer (qtmux, buf, &offset, FALSE);
 }
 
+static void
+unref_buffer_if_set (GstBuffer * buffer)
+{
+  if (buffer)
+    gst_buffer_unref (buffer);
+}
+
+static GstFlowReturn
+gst_qtmux_push_mdat_stored_buffers (GstQTMux * qtmux)
+{
+  GstFlowReturn ret = GST_FLOW_OK;
+  GList *l = qtmux->output_buffers;
+  guint64 mdat_header_size = 0, size = 0;
+
+  for (; l; l = g_list_next (l)) {
+    GstBuffer *buf = (GstBuffer *) l->data;
+
+    size += gst_buffer_get_size (buf);
+  }
+
+  if (size == 0)
+    return GST_FLOW_OK;
+
+  GST_DEBUG_OBJECT (qtmux, "Pushing stored buffers of size %" G_GUINT64_FORMAT
+      " current mdat size %" G_GUINT64_FORMAT, size, qtmux->mdat_size);
+
+  ret = gst_qt_mux_send_mdat_header (qtmux, &mdat_header_size, size,
+      size > MDAT_LARGE_FILE_LIMIT, FALSE);
+
+  /* reset chunking */
+  qtmux->current_chunk_size = 0;
+  qtmux->current_chunk_duration = 0;
+  qtmux->current_chunk_offset = -1;
+
+  /* on the first mdat, we need to offset the header by the mdat header size
+   * as the moov offset is in relation to the first data byte inside the first
+   * mdat */
+  if (qtmux->mdat_size == 0)
+    qtmux->header_size += mdat_header_size;
+  qtmux->mdat_size += mdat_header_size;
+
+  l = qtmux->output_buffers;
+  while (ret == GST_FLOW_OK && l) {
+    GstBuffer *buf = (GstBuffer *) l->data;
+
+    ret = gst_qt_mux_send_buffer (qtmux, buf, &qtmux->mdat_size, TRUE);
+
+    l->data = NULL;
+    l = g_list_next (l);
+  }
+
+  g_list_free_full (qtmux->output_buffers,
+      (GDestroyNotify) unref_buffer_if_set);
+  qtmux->output_buffers = NULL;
+
+  return ret;
+}
+
 static GstFlowReturn
 gst_qt_mux_stop_file (GstQTMux * qtmux)
 {
@@ -3739,6 +3814,14 @@ gst_qt_mux_stop_file (GstQTMux * qtmux)
   g_list_free_full (sinkpads, gst_object_unref);
 
   switch (qtmux->mux_mode) {
+    case GST_QT_MUX_MODE_MOOV_AT_END:{
+      if (!qtmux->downstream_seekable) {
+        ret = gst_qtmux_push_mdat_stored_buffers (qtmux);
+        if (ret != GST_FLOW_OK)
+          return ret;
+      }
+      break;
+    }
     case GST_QT_MUX_MODE_FRAGMENTED:{
       GstSegment segment;
       GstBuffer *buf;
@@ -4072,12 +4155,16 @@ gst_qt_mux_stop_file (GstQTMux * qtmux)
   switch (qtmux->mux_mode) {
     case GST_QT_MUX_MODE_MOOV_AT_END:
     {
-      /* mdat needs update iff not using faststart */
-      GST_DEBUG_OBJECT (qtmux, "updating mdat size");
-      ret = gst_qt_mux_update_mdat_size (qtmux, qtmux->mdat_pos,
-          qtmux->mdat_size, NULL, FALSE);
-      /* note; no seeking back to the end of file is done,
-       * since we no longer write anything anyway */
+      if (qtmux->downstream_seekable) {
+        /* mdat needs update iff not using faststart */
+        GST_DEBUG_OBJECT (qtmux, "updating mdat size at position %"
+            G_GUINT64_FORMAT " to size %" G_GUINT64_FORMAT, qtmux->mdat_pos,
+            qtmux->mdat_size);
+        ret = gst_qt_mux_update_mdat_size (qtmux, qtmux->mdat_pos,
+            qtmux->mdat_size, NULL, FALSE);
+        /* note; no seeking back to the end of file is done,
+         * since we no longer write anything anyway */
+      }
       break;
     }
     case GST_QT_MUX_MODE_FAST_START:
@@ -4752,7 +4839,13 @@ gst_qt_mux_register_and_push_sample (GstQTMux * qtmux, GstQTMuxPad * pad,
     case GST_QT_MUX_MODE_ROBUST_RECORDING:
       atom_trak_add_samples (pad->trak, nsamples, (gint32) scaled_duration,
           sample_size, chunk_offset, sync, pts_offset);
-      ret = gst_qt_mux_send_buffer (qtmux, buffer, &qtmux->mdat_size, TRUE);
+      if (qtmux->mux_mode == GST_QT_MUX_MODE_MOOV_AT_END
+          && !qtmux->downstream_seekable) {
+        qtmux->output_buffers = g_list_append (qtmux->output_buffers, buffer);
+        ret = GST_FLOW_OK;
+      } else {
+        ret = gst_qt_mux_send_buffer (qtmux, buffer, &qtmux->mdat_size, TRUE);
+      }
       /* Check if it's time to re-write the headers in robust-recording mode */
       if (ret == GST_FLOW_OK
           && qtmux->mux_mode == GST_QT_MUX_MODE_ROBUST_RECORDING)
@@ -4860,7 +4953,15 @@ gst_qt_mux_check_and_update_timecode (GstQTMux * qtmux, GstQTMuxPad * pad,
     g_assert (szret == 4);
 
     atom_trak_add_samples (pad->tc_trak, 1, 1, 4, *offset, FALSE, 0);
-    ret = gst_qt_mux_send_buffer (qtmux, tc_buf, offset, TRUE);
+
+    if (qtmux->mux_mode == GST_QT_MUX_MODE_MOOV_AT_END
+        && !qtmux->downstream_seekable) {
+      ret = gst_qtmux_push_mdat_stored_buffers (qtmux);
+      qtmux->output_buffers = g_list_append (qtmux->output_buffers, tc_buf);
+      ret = GST_FLOW_OK;
+    } else {
+      ret = gst_qt_mux_send_buffer (qtmux, tc_buf, offset, TRUE);
+    }
 
     /* Need to reset the current chunk (of the previous pad) here because
      * some other data was written now above, and the pad has to start a
@@ -4877,7 +4978,14 @@ gst_qt_mux_check_and_update_timecode (GstQTMux * qtmux, GstQTMuxPad * pad,
     szret = gst_buffer_fill (tc_buf, 0, &frames_since_daily_jam, 4);
     g_assert (szret == 4);
 
-    ret = gst_qt_mux_send_buffer (qtmux, tc_buf, &qtmux->mdat_size, TRUE);
+    if (qtmux->mux_mode == GST_QT_MUX_MODE_MOOV_AT_END
+        && !qtmux->downstream_seekable) {
+      ret = gst_qtmux_push_mdat_stored_buffers (qtmux);
+      qtmux->output_buffers = g_list_append (qtmux->output_buffers, tc_buf);
+      ret = GST_FLOW_OK;
+    } else {
+      ret = gst_qt_mux_send_buffer (qtmux, tc_buf, &qtmux->mdat_size, TRUE);
+    }
     pad->tc_pos = -1;
 
     qtmux->current_chunk_offset = -1;
@@ -5458,6 +5566,8 @@ find_best_pad (GstQTMux * qtmux)
           GST_DEBUG_PAD_NAME (best_pad));
     }
   } else {
+    gboolean push_stored = FALSE;
+
     GST_OBJECT_LOCK (qtmux);
     if (GST_ELEMENT (qtmux)->sinkpads->next || qtmux->force_chunks) {
       /* Only switch pads if we have more than one, otherwise
@@ -5471,8 +5581,12 @@ find_best_pad (GstQTMux * qtmux)
         GST_DEBUG_OBJECT (qtmux, "Switching from pad %s:%s",
             GST_DEBUG_PAD_NAME (qtmux->current_pad));
       best_pad = qtmux->current_pad = NULL;
+      push_stored = TRUE;
     }
     GST_OBJECT_UNLOCK (qtmux);
+
+    if (push_stored)
+      gst_qtmux_push_mdat_stored_buffers (qtmux);
   }
 
   if (!best_pad) {
index ca6c027..3778632 100644 (file)
@@ -231,6 +231,9 @@ struct _GstQTMux
    * @mux_mode == GST_QT_MUX_MODE_FRAGMENTED */
   GstQTMuxFragmentMode fragment_mode;
 
+  /* whether downstream is seekable */
+  gboolean downstream_seekable;
+
   /* size of header (prefix, atoms (ftyp, possibly moov, mdat header)) */
   guint64 header_size;
   /* accumulated size of raw media data (not including mdat header) */
@@ -261,6 +264,10 @@ struct _GstQTMux
   GstClockTime current_chunk_duration;
   guint64 current_chunk_offset;
 
+  /* list of buffers to hold for batching inside a single mdat when downstream
+   * is not seekable */
+  GList *output_buffers;
+
   /* atom helper objects */
   AtomsContext *context;
   AtomFTYP *ftyp;