#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
+#include <sys/types.h>
#include <errno.h>
#include <string.h>
#include <string.h>
#include "gstelements_private.h"
#ifdef G_OS_WIN32
+# include <io.h> /* lseek, open, close, read */
+# undef lseek
+# define lseek _lseeki64
+# undef off_t
+# define off_t guint64
# define WIN32_LEAN_AND_MEAN /* prevents from including too many things */
# include <windows.h>
# undef WIN32_LEAN_AND_MEAN
GstFlowReturn
gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
GstBuffer ** buffers, guint num_buffers, guint8 * mem_nums,
- guint total_mem_num, guint64 * bytes_written, guint64 skip)
+ guint total_mem_num, guint64 * bytes_written, guint64 skip,
+ gint max_transient_error_timeout, guint64 current_position,
+ gboolean * flushing)
{
struct iovec *vecs;
GstMapInfo *map_infos;
GstFlowReturn flow_ret;
gsize size = 0;
guint i, j;
+ gint64 start_time = 0;
+
+ max_transient_error_timeout *= 1000;
+ if (max_transient_error_timeout)
+ start_time = g_get_monotonic_time ();
GST_LOG_OBJECT (sink, "%u buffers, %u memories", num_buffers, total_mem_num);
}
do {
+ if (flushing != NULL && g_atomic_int_get (flushing)) {
+ GST_DEBUG_OBJECT (sink, "Flushing, exiting loop");
+ flow_ret = GST_FLOW_FLUSHING;
+ goto out;
+ }
#ifndef HAVE_WIN32
if (fdset != NULL) {
do {
if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
/* do nothing, try again */
+ if (max_transient_error_timeout)
+ start_time = g_get_monotonic_time ();
+ } else if (ret < 0 && errno == EACCES && max_transient_error_timeout > 0) {
+ /* seek back to where we started writing and try again after sleeping
+ * for 10ms.
+ *
+ * Some network file systems report EACCES spuriously, presumably
+ * because at the same time another client is reading the file.
+ * It happens at least on Linux and macOS on SMB/CIFS and NFS file
+ * systems.
+ *
+ * Note that NFS does not check access permissions during open()
+ * but only on write()/read() according to open(2), so we would
+ * loop here in case of NFS.
+ */
+ if (g_get_monotonic_time () > start_time + max_transient_error_timeout) {
+ GST_ERROR_OBJECT (sink, "Got EACCES for more than %dms, failing",
+ max_transient_error_timeout);
+ goto write_error;
+ }
+ GST_DEBUG_OBJECT (sink, "got EACCES, retry after 10ms sleep");
+ g_assert (current_position != -1);
+ g_usleep (10000);
+
+ /* Seek back to the current position, sometimes a partial write
+ * happened and we have no idea how much and if what was written
+ * is actually correct (it sometimes isn't)
+ */
+ ret = lseek (fd, current_position + *bytes_written, SEEK_SET);
+ if (ret < 0 || ret != current_position + *bytes_written) {
+ GST_ERROR_OBJECT (sink,
+ "failed to seek back to current write position");
+ goto write_error;
+ }
} else if (ret < 0) {
goto write_error;
- } else if (ret < left) {
+ } else { /* if (ret < left) */
+ if (max_transient_error_timeout)
+ start_time = g_get_monotonic_time ();
/* skip vectors that have been written in full */
while (ret >= vecs[0].iov_len) {
ret -= vecs[0].iov_len;
#include "gstfilesink.h"
#include <string.h>
#include <sys/types.h>
+#include <fcntl.h>
#ifdef G_OS_WIN32
#include <io.h> /* lseek, open, close, read */
#define DEFAULT_BUFFER_MODE GST_FILE_SINK_BUFFER_MODE_DEFAULT
#define DEFAULT_BUFFER_SIZE 64 * 1024
#define DEFAULT_APPEND FALSE
+#define DEFAULT_O_SYNC FALSE
+#define DEFAULT_MAX_TRANSIENT_ERROR_TIMEOUT 0
enum
{
PROP_BUFFER_MODE,
PROP_BUFFER_SIZE,
PROP_APPEND,
+ PROP_O_SYNC,
+ PROP_MAX_TRANSIENT_ERROR_TIMEOUT,
PROP_LAST
};
* use the 'file pointer' opened in glib (and returned from this function)
* in this library, as they may have unrelated C runtimes. */
static FILE *
-gst_fopen (const gchar * filename, const gchar * mode)
+gst_fopen (const gchar * filename, const gchar * mode, gboolean o_sync)
{
+ FILE *retval;
#ifdef G_OS_WIN32
wchar_t *wfilename = g_utf8_to_utf16 (filename, -1, NULL, NULL, NULL);
wchar_t *wmode;
- FILE *retval;
int save_errno;
if (wfilename == NULL) {
errno = save_errno;
return retval;
#else
- return fopen (filename, mode);
+ int fd;
+ int flags = O_CREAT | O_WRONLY;
+
+ if (strcmp (mode, "wb") == 0)
+ flags |= O_TRUNC;
+ else if (strcmp (mode, "ab") == 0)
+ flags |= O_APPEND;
+ else
+ g_assert_not_reached ();
+
+ if (o_sync)
+ flags |= O_SYNC;
+
+ fd = open (filename, flags, 0666);
+
+ retval = fdopen (fd, mode);
+ return retval;
#endif
}
GstBuffer * buffer);
static GstFlowReturn gst_file_sink_render_list (GstBaseSink * sink,
GstBufferList * list);
+static gboolean gst_file_sink_unlock (GstBaseSink * sink);
+static gboolean gst_file_sink_unlock_stop (GstBaseSink * sink);
static gboolean gst_file_sink_do_seek (GstFileSink * filesink,
guint64 new_offset);
"Append to an already existing file", DEFAULT_APPEND,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_O_SYNC,
+ g_param_spec_boolean ("o-sync", "Synchronous IO",
+ "Open the file with O_SYNC for enabling synchronous IO",
+ DEFAULT_O_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class,
+ PROP_MAX_TRANSIENT_ERROR_TIMEOUT,
+ g_param_spec_int ("max-transient-error-timeout",
+ "Max Transient Error Timeout",
+ "Retry up to this many ms on transient errors (currently EACCES)", 0,
+ G_MAXINT, DEFAULT_MAX_TRANSIENT_ERROR_TIMEOUT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
gst_element_class_set_static_metadata (gstelement_class,
"File Sink",
"Sink/File", "Write stream to a file",
gstbasesink_class->render_list =
GST_DEBUG_FUNCPTR (gst_file_sink_render_list);
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_file_sink_event);
+ gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_file_sink_unlock);
+ gstbasesink_class->unlock_stop =
+ GST_DEBUG_FUNCPTR (gst_file_sink_unlock_stop);
if (sizeof (off_t) < 8) {
GST_LOG ("No large file support, sizeof (off_t) = %" G_GSIZE_FORMAT "!",
case PROP_APPEND:
sink->append = g_value_get_boolean (value);
break;
+ case PROP_O_SYNC:
+ sink->o_sync = g_value_get_boolean (value);
+ break;
+ case PROP_MAX_TRANSIENT_ERROR_TIMEOUT:
+ sink->max_transient_error_timeout = g_value_get_int (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_APPEND:
g_value_set_boolean (value, sink->append);
break;
+ case PROP_O_SYNC:
+ g_value_set_boolean (value, sink->o_sync);
+ break;
+ case PROP_MAX_TRANSIENT_ERROR_TIMEOUT:
+ g_value_set_int (value, sink->max_transient_error_timeout);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
goto no_filename;
if (sink->append)
- sink->file = gst_fopen (sink->filename, "ab");
+ sink->file = gst_fopen (sink->filename, "ab", sink->o_sync);
else
- sink->file = gst_fopen (sink->filename, "wb");
+ sink->file = gst_fopen (sink->filename, "wb", sink->o_sync);
if (sink->file == NULL)
goto open_failed;
gst_file_sink_render_buffers (GstFileSink * sink, GstBuffer ** buffers,
guint num_buffers, guint8 * mem_nums, guint total_mems, gsize size)
{
+ GstFlowReturn ret;
+ guint64 bytes_written = 0;
+
GST_DEBUG_OBJECT (sink,
"writing %u buffers (%u memories, %" G_GSIZE_FORMAT
" bytes) at position %" G_GUINT64_FORMAT, num_buffers, total_mems, size,
sink->current_pos);
- return gst_writev_buffers (GST_OBJECT_CAST (sink), fileno (sink->file), NULL,
- buffers, num_buffers, mem_nums, total_mems, &sink->current_pos, 0);
+ ret = gst_writev_buffers (GST_OBJECT_CAST (sink), fileno (sink->file), NULL,
+ buffers, num_buffers, mem_nums, total_mems, &bytes_written, 0,
+ sink->max_transient_error_timeout, sink->current_pos, &sink->flushing);
+
+ sink->current_pos += bytes_written;
+
+ return ret;
}
static GstFlowReturn
static gboolean
gst_file_sink_start (GstBaseSink * basesink)
{
- return gst_file_sink_open_file (GST_FILE_SINK (basesink));
+ GstFileSink *filesink;
+
+ filesink = GST_FILE_SINK_CAST (basesink);
+
+ g_atomic_int_set (&filesink->flushing, FALSE);
+ return gst_file_sink_open_file (filesink);
}
static gboolean
gst_file_sink_stop (GstBaseSink * basesink)
{
- gst_file_sink_close_file (GST_FILE_SINK (basesink));
+ GstFileSink *filesink;
+
+ filesink = GST_FILE_SINK_CAST (basesink);
+
+ gst_file_sink_close_file (filesink);
+ g_atomic_int_set (&filesink->flushing, TRUE);
+ return TRUE;
+}
+
+static gboolean
+gst_file_sink_unlock (GstBaseSink * basesink)
+{
+ GstFileSink *filesink;
+
+ filesink = GST_FILE_SINK_CAST (basesink);
+ g_atomic_int_set (&filesink->flushing, TRUE);
+
+ return TRUE;
+}
+
+static gboolean
+gst_file_sink_unlock_stop (GstBaseSink * basesink)
+{
+ GstFileSink *filesink;
+
+ filesink = GST_FILE_SINK_CAST (basesink);
+ g_atomic_int_set (&filesink->flushing, FALSE);
+
return TRUE;
}