filesink: Add a new full buffer mode to filesink
authorSebastian Dröge <sebastian@centricular.com>
Fri, 20 Mar 2020 17:28:37 +0000 (19:28 +0200)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Thu, 26 Mar 2020 11:31:03 +0000 (11:31 +0000)
Previously the default and full modes were the same. Now the default
mode is like before: it accumulates all buffers in a buffer list until
the threshold is reached and then writes them all out, potentially in
multiple writes.

The new full mode works by always copying memory to a single memory area
and writing everything out with a single write once the threshold is
reached.

plugins/elements/gstfilesink.c
plugins/elements/gstfilesink.h

index aab3c98..3381d0a 100644 (file)
@@ -431,15 +431,24 @@ gst_file_sink_open_file (GstFileSink * sink)
   sink->seekable = gst_file_sink_do_seek (sink, 0);
 
   if (sink->buffer)
-    gst_buffer_list_unref (sink->buffer);
+    g_free (sink->buffer);
   sink->buffer = NULL;
+  if (sink->buffer_list)
+    gst_buffer_list_unref (sink->buffer_list);
+  sink->buffer_list = NULL;
+
   if (sink->buffer_mode != GST_FILE_SINK_BUFFER_MODE_UNBUFFERED) {
     if (sink->buffer_size == 0) {
       sink->buffer_size = DEFAULT_BUFFER_SIZE;
       g_object_notify (G_OBJECT (sink), "buffer-size");
     }
 
-    sink->buffer = gst_buffer_list_new ();
+    if (sink->buffer_mode == GST_FILE_SINK_BUFFER_MODE_FULL) {
+      sink->buffer = g_malloc (sink->buffer_size);
+      sink->allocated_buffer_size = sink->buffer_size;
+    } else {
+      sink->buffer_list = gst_buffer_list_new ();
+    }
     sink->current_buffer_size = 0;
   }
 
@@ -481,9 +490,15 @@ gst_file_sink_close_file (GstFileSink * sink)
   }
 
   if (sink->buffer) {
-    gst_buffer_list_unref (sink->buffer);
+    g_free (sink->buffer);
     sink->buffer = NULL;
   }
+  sink->allocated_buffer_size = 0;
+
+  if (sink->buffer_list) {
+    gst_buffer_list_unref (sink->buffer_list);
+    sink->buffer_list = NULL;
+  }
   sink->current_buffer_size = 0;
 }
 
@@ -633,11 +648,11 @@ gst_file_sink_event (GstBaseSink * sink, GstEvent * event)
         if (ftruncate (fileno (filesink->file), 0))
           goto truncate_failed;
       }
-      if (filesink->buffer) {
-        gst_buffer_list_unref (filesink->buffer);
-        filesink->buffer = gst_buffer_list_new ();
-        filesink->current_buffer_size = 0;
+      if (filesink->buffer_list) {
+        gst_buffer_list_unref (filesink->buffer_list);
+        filesink->buffer_list = gst_buffer_list_new ();
       }
+      filesink->current_buffer_size = 0;
       break;
     case GST_EVENT_EOS:
       if (gst_file_sink_flush_buffer (filesink) != GST_FLOW_OK)
@@ -736,23 +751,36 @@ gst_file_sink_flush_buffer (GstFileSink * filesink)
 {
   GstFlowReturn flow_ret = GST_FLOW_OK;
 
-  if (filesink->buffer) {
+  GST_DEBUG_OBJECT (filesink, "Flushing out buffer of size %" G_GSIZE_FORMAT,
+      filesink->current_buffer_size);
+
+  if (filesink->buffer && filesink->current_buffer_size) {
+    guint64 bytes_written = 0;
+
+    flow_ret =
+        gst_writev_mem (GST_OBJECT_CAST (filesink), fileno (filesink->file),
+        NULL, filesink->buffer, filesink->current_buffer_size, &bytes_written,
+        0, filesink->max_transient_error_timeout, filesink->current_pos,
+        &filesink->flushing);
+
+    filesink->current_pos += bytes_written;
+
+  } else if (filesink->buffer_list && filesink->current_buffer_size) {
     guint length;
 
-    length = gst_buffer_list_length (filesink->buffer);
+    length = gst_buffer_list_length (filesink->buffer_list);
 
     if (length > 0) {
-      GST_DEBUG_OBJECT (filesink, "Flushing out buffer of size %u",
-          filesink->current_buffer_size);
       flow_ret =
-          gst_file_sink_render_list_internal (filesink, filesink->buffer);
+          gst_file_sink_render_list_internal (filesink, filesink->buffer_list);
       /* Remove all buffers from the list but keep the list. This ensures that
        * we don't re-allocate the array storing the buffers all the time */
-      gst_buffer_list_remove (filesink->buffer, 0, length);
-      filesink->current_buffer_size = 0;
+      gst_buffer_list_remove (filesink->buffer_list, 0, length);
     }
   }
 
+  filesink->current_buffer_size = 0;
+
   return flow_ret;
 }
 
@@ -796,7 +824,7 @@ gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
 
   gst_buffer_list_foreach (buffer_list, has_sync_after_buffer, &sync_after);
 
-  if (sync_after || !sink->buffer) {
+  if (sync_after || (!sink->buffer && !sink->buffer_list)) {
     flow = gst_file_sink_flush_buffer (sink);
     if (flow == GST_FLOW_OK)
       flow = gst_file_sink_render_list_internal (sink, buffer_list);
@@ -809,15 +837,52 @@ gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
         G_GUINT64_FORMAT, size, num_buffers,
         sink->current_pos + sink->current_buffer_size);
 
-    for (i = 0; i < num_buffers; ++i)
-      gst_buffer_list_add (sink->buffer,
-          gst_buffer_ref (gst_buffer_list_get (buffer_list, i)));
-    sink->current_buffer_size += size;
-
-    if (sink->current_buffer_size > sink->buffer_size)
-      flow = gst_file_sink_flush_buffer (sink);
-    else
+    if (sink->buffer) {
       flow = GST_FLOW_OK;
+      for (i = 0; i < num_buffers && flow == GST_FLOW_OK; i++) {
+        GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
+        gsize buffer_size = gst_buffer_get_size (buffer);
+
+        if (sink->current_buffer_size + buffer_size >
+            sink->allocated_buffer_size) {
+          flow = gst_file_sink_flush_buffer (sink);
+          if (flow != GST_FLOW_OK)
+            return flow;
+        }
+
+        if (buffer_size > sink->allocated_buffer_size) {
+          guint64 bytes_written = 0;
+
+          GST_DEBUG_OBJECT (sink,
+              "writing buffer ( %" G_GSIZE_FORMAT
+              " bytes) at position %" G_GUINT64_FORMAT,
+              buffer_size, sink->current_pos);
+
+          flow =
+              gst_writev_buffer (GST_OBJECT_CAST (sink),
+              fileno (sink->file), NULL, buffer, &bytes_written, 0,
+              sink->max_transient_error_timeout, sink->current_pos,
+              &sink->flushing);
+
+          sink->current_pos += bytes_written;
+        } else {
+          sink->current_buffer_size +=
+              gst_buffer_extract (buffer, 0,
+              sink->buffer + sink->current_buffer_size, buffer_size);
+          flow = GST_FLOW_OK;
+        }
+      }
+    } else {
+      for (i = 0; i < num_buffers; ++i)
+        gst_buffer_list_add (sink->buffer_list,
+            gst_buffer_ref (gst_buffer_list_get (buffer_list, i)));
+      sink->current_buffer_size += size;
+
+      if (sink->current_buffer_size > sink->buffer_size)
+        flow = gst_file_sink_flush_buffer (sink);
+      else
+        flow = GST_FLOW_OK;
+    }
   }
 
   if (flow == GST_FLOW_OK && sync_after) {
@@ -856,7 +921,8 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer)
 
   n_mem = gst_buffer_n_memory (buffer);
 
-  if (n_mem > 0 && (sync_after || !filesink->buffer)) {
+  if (n_mem > 0 && (sync_after || (!filesink->buffer
+              && !filesink->buffer_list))) {
     flow = gst_file_sink_flush_buffer (filesink);
     if (flow == GST_FLOW_OK) {
       guint64 bytes_written = 0;
@@ -875,18 +941,51 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer)
       filesink->current_pos += bytes_written;
     }
   } else if (n_mem > 0) {
+    gsize size = gst_buffer_get_size (buffer);
+
     GST_DEBUG_OBJECT (filesink,
         "Queueing buffer of %" G_GSIZE_FORMAT " bytes at offset %"
-        G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
+        G_GUINT64_FORMAT, size,
         filesink->current_pos + filesink->current_buffer_size);
 
-    filesink->current_buffer_size += gst_buffer_get_size (buffer);
-    gst_buffer_list_add (filesink->buffer, gst_buffer_ref (buffer));
+    if (filesink->buffer) {
+      if (filesink->current_buffer_size + size >
+          filesink->allocated_buffer_size) {
+        flow = gst_file_sink_flush_buffer (filesink);
+        if (flow != GST_FLOW_OK)
+          return flow;
+      }
 
-    if (filesink->current_buffer_size > filesink->buffer_size)
-      flow = gst_file_sink_flush_buffer (filesink);
-    else
-      flow = GST_FLOW_OK;
+      if (size > filesink->allocated_buffer_size) {
+        guint64 bytes_written = 0;
+
+        GST_DEBUG_OBJECT (sink,
+            "writing buffer ( %" G_GSIZE_FORMAT
+            " bytes) at position %" G_GUINT64_FORMAT,
+            size, filesink->current_pos);
+
+        flow =
+            gst_writev_buffer (GST_OBJECT_CAST (filesink),
+            fileno (filesink->file), NULL, buffer, &bytes_written, 0,
+            filesink->max_transient_error_timeout, filesink->current_pos,
+            &filesink->flushing);
+
+        filesink->current_pos += bytes_written;
+      } else {
+        filesink->current_buffer_size +=
+            gst_buffer_extract (buffer, 0,
+            filesink->buffer + filesink->current_buffer_size, size);
+        flow = GST_FLOW_OK;
+      }
+    } else {
+      filesink->current_buffer_size += gst_buffer_get_size (buffer);
+      gst_buffer_list_add (filesink->buffer_list, gst_buffer_ref (buffer));
+
+      if (filesink->current_buffer_size > filesink->buffer_size)
+        flow = gst_file_sink_flush_buffer (filesink);
+      else
+        flow = GST_FLOW_OK;
+    }
   } else {
     flow = GST_FLOW_OK;
   }
index a5dc90b..bc479c5 100644 (file)
@@ -81,8 +81,15 @@ struct _GstFileSink {
   gint    buffer_mode;
   guint   buffer_size;
 
-  GstBufferList *buffer;
-  guint   current_buffer_size;
+  /* For default buffer mode */
+  GstBufferList *buffer_list;
+
+  /* For full buffer mode */
+  guint8 *buffer;
+  gsize   allocated_buffer_size;
+
+  /* For default/full buffer mode */
+  gsize current_buffer_size;
 
   gboolean append;
   gboolean o_sync;