filesink: Implement buffering internally
authorSebastian Dröge <sebastian@centricular.com>
Tue, 14 Aug 2018 08:28:00 +0000 (11:28 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Thu, 16 Aug 2018 14:01:07 +0000 (17:01 +0300)
We use writev() so every call ends up going to the kernel but for small
buffers we generally would prefer to do as few write calls as possible.

https://bugzilla.gnome.org/show_bug.cgi?id=794173

plugins/elements/gstfilesink.c
plugins/elements/gstfilesink.h

index 7fa1f36..67135fc 100644 (file)
@@ -183,6 +183,8 @@ static gboolean gst_file_sink_query (GstBaseSink * bsink, GstQuery * query);
 static void gst_file_sink_uri_handler_init (gpointer g_iface,
     gpointer iface_data);
 
+static GstFlowReturn gst_file_sink_flush_buffer (GstFileSink * filesink);
+
 #define _do_init \
   G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_file_sink_uri_handler_init); \
   GST_DEBUG_CATEGORY_INIT (gst_file_sink_debug, "filesink", 0, "filesink element");
@@ -377,6 +379,18 @@ gst_file_sink_open_file (GstFileSink * sink)
   /* try to seek in the file to figure out if it is seekable */
   sink->seekable = gst_file_sink_do_seek (sink, 0);
 
+  if (sink->buffer)
+    gst_buffer_list_unref (sink->buffer);
+  sink->buffer = NULL;
+  if (sink->buffer_mode != GST_FILE_SINK_BUFFER_MODE_UNBUFFERED) {
+    if (sink->buffer_size == 0) {
+      sink->buffer_size = DEFAULT_BUFFER_SIZE;
+      g_object_notify (G_OBJECT (sink), "buffer-size");
+    }
+
+    sink->buffer = gst_buffer_list_new ();
+  }
+
   GST_DEBUG_OBJECT (sink, "opened file %s, seekable %d",
       sink->filename, sink->seekable);
 
@@ -402,6 +416,10 @@ static void
 gst_file_sink_close_file (GstFileSink * sink)
 {
   if (sink->file) {
+    if (gst_file_sink_flush_buffer (sink) != GST_FLOW_OK)
+      GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE,
+          (_("Error closing file \"%s\"."), sink->filename), NULL);
+
     if (fclose (sink->file) != 0)
       GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE,
           (_("Error closing file \"%s\"."), sink->filename), GST_ERROR_SYSTEM);
@@ -409,6 +427,11 @@ gst_file_sink_close_file (GstFileSink * sink)
     GST_DEBUG_OBJECT (sink, "closed file");
     sink->file = NULL;
   }
+
+  if (sink->buffer) {
+    gst_buffer_list_unref (sink->buffer);
+    sink->buffer = NULL;
+  }
 }
 
 static gboolean
@@ -477,6 +500,9 @@ gst_file_sink_do_seek (GstFileSink * filesink, guint64 new_offset)
   GST_DEBUG_OBJECT (filesink, "Seeking to offset %" G_GUINT64_FORMAT
       " using " __GST_STDIO_SEEK_FUNCTION, new_offset);
 
+  if (gst_file_sink_flush_buffer (filesink) != GST_FLOW_OK)
+    goto flush_buffer_failed;
+
 #ifdef HAVE_FSEEKO
   if (fseeko (filesink->file, (off_t) new_offset, SEEK_SET) != 0)
     goto seek_failed;
@@ -496,6 +522,11 @@ gst_file_sink_do_seek (GstFileSink * filesink, guint64 new_offset)
   return TRUE;
 
   /* ERRORS */
+flush_buffer_failed:
+  {
+    GST_DEBUG_OBJECT (filesink, "Flushing buffer failed");
+    return FALSE;
+  }
 seek_failed:
   {
     GST_DEBUG_OBJECT (filesink, "Seeking failed: %s", g_strerror (errno));
@@ -548,6 +579,10 @@ gst_file_sink_event (GstBaseSink * sink, GstEvent * event)
           goto truncate_failed;
       }
       break;
+    case GST_EVENT_EOS:
+      if (gst_file_sink_flush_buffer (filesink) != GST_FLOW_OK)
+        goto flush_buffer_failed;
+      break;
     default:
       break;
   }
@@ -563,6 +598,13 @@ seek_failed:
     gst_event_unref (event);
     return FALSE;
   }
+flush_buffer_failed:
+  {
+    GST_ELEMENT_ERROR (filesink, RESOURCE, WRITE,
+        (_("Error while writing to file \"%s\"."), filesink->filename), NULL);
+    gst_event_unref (event);
+    return FALSE;
+  }
 truncate_failed:
   {
     GST_ELEMENT_ERROR (filesink, RESOURCE, WRITE,
@@ -578,6 +620,11 @@ gst_file_sink_get_current_offset (GstFileSink * filesink, guint64 * p_pos)
 {
   off_t ret = -1;
 
+  /* no need to flush internal buffer here as this is only called right
+   * after a seek. If this changes then the buffer should be flushed here
+   * too
+   */
+
 #ifdef HAVE_FTELLO
   ret = ftello (filesink->file);
 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
@@ -605,17 +652,14 @@ gst_file_sink_render_buffers (GstFileSink * sink, GstBuffer ** buffers,
 }
 
 static GstFlowReturn
-gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
+gst_file_sink_render_list_internal (GstFileSink * sink,
+    GstBufferList * buffer_list)
 {
   GstFlowReturn flow;
   GstBuffer **buffers;
-  GstFileSink *sink;
   guint8 *mem_nums;
   guint total_mems;
   guint i, num_buffers;
-  gboolean sync_after = FALSE;
-
-  sink = GST_FILE_SINK_CAST (bsink);
 
   num_buffers = gst_buffer_list_length (buffer_list);
   if (num_buffers == 0)
@@ -628,14 +672,102 @@ gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
     buffers[i] = gst_buffer_list_get (buffer_list, i);
     mem_nums[i] = gst_buffer_n_memory (buffers[i]);
     total_mems += mem_nums[i];
-    if (GST_BUFFER_FLAG_IS_SET (buffers[i], GST_BUFFER_FLAG_SYNC_AFTER))
-      sync_after = TRUE;
   }
 
   flow =
       gst_file_sink_render_buffers (sink, buffers, num_buffers, mem_nums,
       total_mems);
 
+  return flow;
+
+no_data:
+  {
+    GST_LOG_OBJECT (sink, "empty buffer list");
+    return GST_FLOW_OK;
+  }
+}
+
+static GstFlowReturn
+gst_file_sink_flush_buffer (GstFileSink * filesink)
+{
+  GstFlowReturn flow_ret = GST_FLOW_OK;
+
+  if (filesink->buffer) {
+    guint length;
+
+    length = gst_buffer_list_length (filesink->buffer);
+
+    if (length > 0) {
+      GST_DEBUG_OBJECT (filesink, "Flushing out buffer of size %u",
+          filesink->current_buffer_size);
+      flow_ret =
+          gst_file_sink_render_list_internal (filesink, filesink->buffer);
+      /* Remove all buffers from the list but keep the list. This ensures that
+       * we don't re-allocate the array storing the buffers all the time */
+      gst_buffer_list_remove (filesink->buffer, 0, length);
+      filesink->current_buffer_size = 0;
+    }
+  }
+
+  return flow_ret;
+}
+
+static gboolean
+has_sync_after_buffer (GstBuffer ** buffer, guint idx, gpointer user_data)
+{
+  if (GST_BUFFER_FLAG_IS_SET (*buffer, GST_BUFFER_FLAG_SYNC_AFTER)) {
+    gboolean *sync_after = user_data;
+
+    *sync_after = TRUE;
+    return FALSE;
+  }
+
+  return TRUE;
+}
+
+static gboolean
+accumulate_size (GstBuffer ** buffer, guint idx, gpointer user_data)
+{
+  guint *size = user_data;
+
+  *size += gst_buffer_get_size (*buffer);
+
+  return TRUE;
+}
+
+static GstFlowReturn
+gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
+{
+  GstFlowReturn flow;
+  GstFileSink *sink;
+  guint i, num_buffers;
+  gboolean sync_after = FALSE;
+
+  sink = GST_FILE_SINK_CAST (bsink);
+
+  num_buffers = gst_buffer_list_length (buffer_list);
+  if (num_buffers == 0)
+    goto no_data;
+
+  gst_buffer_list_foreach (buffer_list, has_sync_after_buffer, &sync_after);
+
+  if (sync_after || !sink->buffer) {
+    flow = gst_file_sink_render_list_internal (sink, buffer_list);
+  } else {
+    guint size = 0;
+    gst_buffer_list_foreach (buffer_list, accumulate_size, &size);
+
+    for (i = 0; i < num_buffers; ++i)
+      gst_buffer_list_add (sink->buffer,
+          gst_buffer_ref (gst_buffer_list_get (buffer_list, i)));
+    sink->current_buffer_size += size;
+
+    if (sink->current_buffer_size > sink->buffer_size)
+      flow = gst_file_sink_flush_buffer (sink);
+    else
+      flow = GST_FLOW_OK;
+  }
+
   if (flow == GST_FLOW_OK && sync_after) {
     if (fsync (fileno (sink->file))) {
       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
@@ -660,18 +792,30 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer)
   GstFileSink *filesink;
   GstFlowReturn flow;
   guint8 n_mem;
+  gboolean sync_after;
 
   filesink = GST_FILE_SINK_CAST (sink);
 
+  sync_after = GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_SYNC_AFTER);
+
   n_mem = gst_buffer_n_memory (buffer);
 
-  if (n_mem > 0)
+  if (n_mem > 0 && (sync_after || !filesink->buffer)) {
     flow = gst_file_sink_render_buffers (filesink, &buffer, 1, &n_mem, n_mem);
-  else
+  } else if (n_mem > 0) {
+
+    filesink->current_buffer_size += gst_buffer_get_size (buffer);
+    gst_buffer_list_add (filesink->buffer, gst_buffer_ref (buffer));
+
+    if (filesink->current_buffer_size > filesink->buffer_size)
+      flow = gst_file_sink_flush_buffer (filesink);
+    else
+      flow = GST_FLOW_OK;
+  } else {
     flow = GST_FLOW_OK;
+  }
 
-  if (flow == GST_FLOW_OK &&
-      GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_SYNC_AFTER)) {
+  if (flow == GST_FLOW_OK && sync_after) {
     if (fsync (fileno (filesink->file))) {
       GST_ELEMENT_ERROR (filesink, RESOURCE, WRITE,
           (_("Error while writing to file \"%s\"."), filesink->filename),
index 99718a3..c41a9a2 100644 (file)
@@ -81,6 +81,9 @@ struct _GstFileSink {
   gint    buffer_mode;
   guint   buffer_size;
 
+  GstBufferList *buffer;
+  guint   current_buffer_size;
+
   gboolean append;
 };