filesink: Implement workaround for some (network) filesystems that spuriously return...
authorSebastian Dröge <sebastian@centricular.com>
Mon, 6 May 2019 19:17:50 +0000 (22:17 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Thu, 16 May 2019 11:15:02 +0000 (14:15 +0300)
This seems to happen when another client is accessing the file at the
same time, and retrying after a short amount of time solves it.

Sometimes partial data is written at that point already but we have no
idea how much it is, or if what was written is correct (it sometimes
isn't) so we always first seek back to the current position and repeat
the whole failed write.

It happens at least on Linux and macOS on SMB/CIFS and NFS file systems.

Between write attempts that failed with EACCES we wait 10ms, and after
enough consecutive tries that failed with EACCES we simply time out.

In theory a valid EACCES for files to which we simply have no access
should've happened already during the call to open(), except for NFS
(see open(2)).

This can be enabled with the new max-transient-error-timeout property, and
a new o-sync boolean property was added to open the file in O_SYNC mode
as without that it's not guaranteed that we get EACCES for the actual
writev() call that failed but might only get it at a later time.

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/issues/305

plugins/elements/gstelements_private.c
plugins/elements/gstelements_private.h
plugins/elements/gstfdsink.c
plugins/elements/gstfilesink.c
plugins/elements/gstfilesink.h

index 2c0ca58..1e25f88 100644 (file)
@@ -32,6 +32,7 @@
 #ifdef HAVE_SYS_UIO_H
 #include <sys/uio.h>
 #endif
+#include <sys/types.h>
 #include <errno.h>
 #include <string.h>
 #include <string.h>
 #include "gstelements_private.h"
 
 #ifdef G_OS_WIN32
+#  include <io.h>               /* lseek, open, close, read */
+#  undef lseek
+#  define lseek _lseeki64
+#  undef off_t
+#  define off_t guint64
 #  define WIN32_LEAN_AND_MEAN   /* prevents from including too many things */
 #  include <windows.h>
 #  undef WIN32_LEAN_AND_MEAN
@@ -215,13 +221,20 @@ fill_vectors (struct iovec *vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
 GstFlowReturn
 gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
     GstBuffer ** buffers, guint num_buffers, guint8 * mem_nums,
-    guint total_mem_num, guint64 * bytes_written, guint64 skip)
+    guint total_mem_num, guint64 * bytes_written, guint64 skip,
+    gint max_transient_error_timeout, guint64 current_position,
+    gboolean * flushing)
 {
   struct iovec *vecs;
   GstMapInfo *map_infos;
   GstFlowReturn flow_ret;
   gsize size = 0;
   guint i, j;
+  gint64 start_time = 0;
+
+  max_transient_error_timeout *= 1000;
+  if (max_transient_error_timeout)
+    start_time = g_get_monotonic_time ();
 
   GST_LOG_OBJECT (sink, "%u buffers, %u memories", num_buffers, total_mem_num);
 
@@ -248,6 +261,11 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
     }
 
     do {
+      if (flushing != NULL && g_atomic_int_get (flushing)) {
+        GST_DEBUG_OBJECT (sink, "Flushing, exiting loop");
+        flow_ret = GST_FLOW_FLUSHING;
+        goto out;
+      }
 #ifndef HAVE_WIN32
       if (fdset != NULL) {
         do {
@@ -279,9 +297,45 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
 
       if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
         /* do nothing, try again */
+        if (max_transient_error_timeout)
+          start_time = g_get_monotonic_time ();
+      } else if (ret < 0 && errno == EACCES && max_transient_error_timeout > 0) {
+        /* seek back to where we started writing and try again after sleeping
+         * for 10ms.
+         *
+         * Some network file systems report EACCES spuriously, presumably
+         * because at the same time another client is reading the file.
+         * It happens at least on Linux and macOS on SMB/CIFS and NFS file
+         * systems.
+         *
+         * Note that NFS does not check access permissions during open()
+         * but only on write()/read() according to open(2), so we would
+         * loop here in case of NFS.
+         */
+        if (g_get_monotonic_time () > start_time + max_transient_error_timeout) {
+          GST_ERROR_OBJECT (sink, "Got EACCES for more than %dms, failing",
+              max_transient_error_timeout);
+          goto write_error;
+        }
+        GST_DEBUG_OBJECT (sink, "got EACCES, retry after 10ms sleep");
+        g_assert (current_position != -1);
+        g_usleep (10000);
+
+        /* Seek back to the current position, sometimes a partial write
+         * happened and we have no idea how much and if what was written
+         * is actually correct (it sometimes isn't)
+         */
+        ret = lseek (fd, current_position + *bytes_written, SEEK_SET);
+        if (ret < 0 || ret != current_position + *bytes_written) {
+          GST_ERROR_OBJECT (sink,
+              "failed to seek back to current write position");
+          goto write_error;
+        }
       } else if (ret < 0) {
         goto write_error;
-      } else if (ret < left) {
+      } else {                  /* if (ret < left) */
+        if (max_transient_error_timeout)
+          start_time = g_get_monotonic_time ();
         /* skip vectors that have been written in full */
         while (ret >= vecs[0].iov_len) {
           ret -= vecs[0].iov_len;
index d196594..3dcb541 100644 (file)
@@ -37,7 +37,9 @@ G_GNUC_INTERNAL
 GstFlowReturn  gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
                                    GstBuffer ** buffers, guint num_buffers,
                                    guint8 * mem_nums, guint total_mem_num,
-                                   guint64 * bytes_written, guint64 skip);
+                                   guint64 * bytes_written, guint64 skip,
+                                   gint max_transient_error_timeout, guint64 current_position,
+                                   gboolean * flushing);
 
 G_END_DECLS
 
index 7641cde..8f9996f 100644 (file)
@@ -249,7 +249,8 @@ gst_fd_sink_render_buffers (GstFdSink * sink, GstBuffer ** buffers,
     guint64 bytes_written = 0;
 
     ret = gst_writev_buffers (GST_OBJECT_CAST (sink), sink->fd, sink->fdset,
-        buffers, num_buffers, mem_nums, total_mems, &bytes_written, skip);
+        buffers, num_buffers, mem_nums, total_mems, &bytes_written, skip,
+        0, -1, NULL);
 
     sink->bytes_written += bytes_written;
     sink->current_pos += bytes_written;
index c722759..0240048 100644 (file)
@@ -49,6 +49,7 @@
 #include "gstfilesink.h"
 #include <string.h>
 #include <sys/types.h>
+#include <fcntl.h>
 
 #ifdef G_OS_WIN32
 #include <io.h>                 /* lseek, open, close, read */
@@ -106,6 +107,8 @@ GST_DEBUG_CATEGORY_STATIC (gst_file_sink_debug);
 #define DEFAULT_BUFFER_MODE    GST_FILE_SINK_BUFFER_MODE_DEFAULT
 #define DEFAULT_BUFFER_SIZE    64 * 1024
 #define DEFAULT_APPEND         FALSE
+#define DEFAULT_O_SYNC         FALSE
+#define DEFAULT_MAX_TRANSIENT_ERROR_TIMEOUT    0
 
 enum
 {
@@ -114,6 +117,8 @@ enum
   PROP_BUFFER_MODE,
   PROP_BUFFER_SIZE,
   PROP_APPEND,
+  PROP_O_SYNC,
+  PROP_MAX_TRANSIENT_ERROR_TIMEOUT,
   PROP_LAST
 };
 
@@ -121,12 +126,12 @@ enum
  * use the 'file pointer' opened in glib (and returned from this function)
  * in this library, as they may have unrelated C runtimes. */
 static FILE *
-gst_fopen (const gchar * filename, const gchar * mode)
+gst_fopen (const gchar * filename, const gchar * mode, gboolean o_sync)
 {
+  FILE *retval;
 #ifdef G_OS_WIN32
   wchar_t *wfilename = g_utf8_to_utf16 (filename, -1, NULL, NULL, NULL);
   wchar_t *wmode;
-  FILE *retval;
   int save_errno;
 
   if (wfilename == NULL) {
@@ -151,7 +156,23 @@ gst_fopen (const gchar * filename, const gchar * mode)
   errno = save_errno;
   return retval;
 #else
-  return fopen (filename, mode);
+  int fd;
+  int flags = O_CREAT | O_WRONLY;
+
+  if (strcmp (mode, "wb") == 0)
+    flags |= O_TRUNC;
+  else if (strcmp (mode, "ab") == 0)
+    flags |= O_APPEND;
+  else
+    g_assert_not_reached ();
+
+  if (o_sync)
+    flags |= O_SYNC;
+
+  fd = open (filename, flags, 0666);
+
+  retval = fdopen (fd, mode);
+  return retval;
 #endif
 }
 
@@ -172,6 +193,8 @@ static GstFlowReturn gst_file_sink_render (GstBaseSink * sink,
     GstBuffer * buffer);
 static GstFlowReturn gst_file_sink_render_list (GstBaseSink * sink,
     GstBufferList * list);
+static gboolean gst_file_sink_unlock (GstBaseSink * sink);
+static gboolean gst_file_sink_unlock_stop (GstBaseSink * sink);
 
 static gboolean gst_file_sink_do_seek (GstFileSink * filesink,
     guint64 new_offset);
@@ -230,6 +253,19 @@ gst_file_sink_class_init (GstFileSinkClass * klass)
           "Append to an already existing file", DEFAULT_APPEND,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_O_SYNC,
+      g_param_spec_boolean ("o-sync", "Synchronous IO",
+          "Open the file with O_SYNC for enabling synchronous IO",
+          DEFAULT_O_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class,
+      PROP_MAX_TRANSIENT_ERROR_TIMEOUT,
+      g_param_spec_int ("max-transient-error-timeout",
+          "Max Transient Error Timeout",
+          "Retry up to this many ms on transient errors (currently EACCES)", 0,
+          G_MAXINT, DEFAULT_MAX_TRANSIENT_ERROR_TIMEOUT,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   gst_element_class_set_static_metadata (gstelement_class,
       "File Sink",
       "Sink/File", "Write stream to a file",
@@ -243,6 +279,9 @@ gst_file_sink_class_init (GstFileSinkClass * klass)
   gstbasesink_class->render_list =
       GST_DEBUG_FUNCPTR (gst_file_sink_render_list);
   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_file_sink_event);
+  gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_file_sink_unlock);
+  gstbasesink_class->unlock_stop =
+      GST_DEBUG_FUNCPTR (gst_file_sink_unlock_stop);
 
   if (sizeof (off_t) < 8) {
     GST_LOG ("No large file support, sizeof (off_t) = %" G_GSIZE_FORMAT "!",
@@ -330,6 +369,12 @@ gst_file_sink_set_property (GObject * object, guint prop_id,
     case PROP_APPEND:
       sink->append = g_value_get_boolean (value);
       break;
+    case PROP_O_SYNC:
+      sink->o_sync = g_value_get_boolean (value);
+      break;
+    case PROP_MAX_TRANSIENT_ERROR_TIMEOUT:
+      sink->max_transient_error_timeout = g_value_get_int (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -355,6 +400,12 @@ gst_file_sink_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_APPEND:
       g_value_set_boolean (value, sink->append);
       break;
+    case PROP_O_SYNC:
+      g_value_set_boolean (value, sink->o_sync);
+      break;
+    case PROP_MAX_TRANSIENT_ERROR_TIMEOUT:
+      g_value_set_int (value, sink->max_transient_error_timeout);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -369,9 +420,9 @@ gst_file_sink_open_file (GstFileSink * sink)
     goto no_filename;
 
   if (sink->append)
-    sink->file = gst_fopen (sink->filename, "ab");
+    sink->file = gst_fopen (sink->filename, "ab", sink->o_sync);
   else
-    sink->file = gst_fopen (sink->filename, "wb");
+    sink->file = gst_fopen (sink->filename, "wb", sink->o_sync);
   if (sink->file == NULL)
     goto open_failed;
 
@@ -652,13 +703,21 @@ static GstFlowReturn
 gst_file_sink_render_buffers (GstFileSink * sink, GstBuffer ** buffers,
     guint num_buffers, guint8 * mem_nums, guint total_mems, gsize size)
 {
+  GstFlowReturn ret;
+  guint64 bytes_written = 0;
+
   GST_DEBUG_OBJECT (sink,
       "writing %u buffers (%u memories, %" G_GSIZE_FORMAT
       " bytes) at position %" G_GUINT64_FORMAT, num_buffers, total_mems, size,
       sink->current_pos);
 
-  return gst_writev_buffers (GST_OBJECT_CAST (sink), fileno (sink->file), NULL,
-      buffers, num_buffers, mem_nums, total_mems, &sink->current_pos, 0);
+  ret = gst_writev_buffers (GST_OBJECT_CAST (sink), fileno (sink->file), NULL,
+      buffers, num_buffers, mem_nums, total_mems, &bytes_written, 0,
+      sink->max_transient_error_timeout, sink->current_pos, &sink->flushing);
+
+  sink->current_pos += bytes_written;
+
+  return ret;
 }
 
 static GstFlowReturn
@@ -857,13 +916,45 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer)
 static gboolean
 gst_file_sink_start (GstBaseSink * basesink)
 {
-  return gst_file_sink_open_file (GST_FILE_SINK (basesink));
+  GstFileSink *filesink;
+
+  filesink = GST_FILE_SINK_CAST (basesink);
+
+  g_atomic_int_set (&filesink->flushing, FALSE);
+  return gst_file_sink_open_file (filesink);
 }
 
 static gboolean
 gst_file_sink_stop (GstBaseSink * basesink)
 {
-  gst_file_sink_close_file (GST_FILE_SINK (basesink));
+  GstFileSink *filesink;
+
+  filesink = GST_FILE_SINK_CAST (basesink);
+
+  gst_file_sink_close_file (filesink);
+  g_atomic_int_set (&filesink->flushing, TRUE);
+  return TRUE;
+}
+
+static gboolean
+gst_file_sink_unlock (GstBaseSink * basesink)
+{
+  GstFileSink *filesink;
+
+  filesink = GST_FILE_SINK_CAST (basesink);
+  g_atomic_int_set (&filesink->flushing, TRUE);
+
+  return TRUE;
+}
+
+static gboolean
+gst_file_sink_unlock_stop (GstBaseSink * basesink)
+{
+  GstFileSink *filesink;
+
+  filesink = GST_FILE_SINK_CAST (basesink);
+  g_atomic_int_set (&filesink->flushing, FALSE);
+
   return TRUE;
 }
 
index c41a9a2..a5dc90b 100644 (file)
@@ -85,6 +85,10 @@ struct _GstFileSink {
   guint   current_buffer_size;
 
   gboolean append;
+  gboolean o_sync;
+  gint max_transient_error_timeout;
+
+  gboolean flushing;
 };
 
 struct _GstFileSinkClass {