plugins: add helper function for writing buffers out with writev()
authorTim-Philipp Müller <tim@centricular.com>
Fri, 28 Nov 2014 14:38:30 +0000 (14:38 +0000)
committerTim-Philipp Müller <tim@centricular.com>
Sun, 30 Nov 2014 14:40:46 +0000 (14:40 +0000)
configure.ac
plugins/elements/gstelements_private.c
plugins/elements/gstelements_private.h

index 1e7c88d..ab50a3a 100644 (file)
@@ -344,6 +344,9 @@ AM_CONDITIONAL(HAVE_PTHREAD, test "x$HAVE_PTHREAD" = "xyes")
 dnl check for sys/prctl for setting thread name on Linux
 AC_CHECK_HEADERS([sys/prctl.h], [], [], [AC_INCLUDES_DEFAULT])
 
+dnl check for sys/uio.h for writev()
+AC_CHECK_HEADERS([sys/uio.h], [], [], [AC_INCLUDES_DEFAULT])
+
 dnl Check for valgrind.h
 dnl separate from HAVE_VALGRIND because you can have the program, but not
 dnl the dev package
index f539a85..ff2a3a1 100644 (file)
 #ifdef HAVE_CONFIG_H
 # include "config.h"
 #endif
+#include <stdio.h>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#ifdef HAVE_SYS_UIO_H
+#include <sys/uio.h>
+#endif
+#include <errno.h>
+#include <string.h>
 #include <string.h>
 #include "gst/gst.h"
 #include "gstelements_private.h"
@@ -65,3 +74,235 @@ gst_buffer_get_flags_string (GstBuffer * buffer)
 
   return flag_str;
 }
+
+/* Define our own iovec structure here, so that we can use it unconditionally
+ * in the code below and use almost the same code path for systems where
+ * writev() is supported and those were it's not supported */
+#ifndef HAVE_SYS_UIO_H
+struct iovec
+{
+  gpointer iov_base;
+  gsize iov_len;
+};
+#endif
+
+/* completely arbitrary thresholds */
+#define FDSINK_MAX_ALLOCA_SIZE (64 * 1024)      /* 64k */
+#define FDSINK_MAX_MALLOC_SIZE ( 8 * 1024 * 1024)       /*  8M */
+
+static gssize
+gst_writev (gint fd, const struct iovec *iov, gint iovcnt, gsize total_bytes)
+{
+  gssize written;
+
+#ifdef HAVE_SYS_UIO_H
+  if (TRUE) {
+    do {
+      written = writev (fd, iov, iovcnt);
+    } while (written < 0 && errno == EINTR);
+  } else
+#endif
+  {
+    gint i;
+
+    /* We merge the memories here because technically write()/writev() is
+     * supposed to be atomic, which it's not if we do multiple separate
+     * write() calls. It's very doubtful anyone cares though in our use
+     * cases, and it's not clear how that can be reconciled with the
+     * possibility of short writes, so in any case we might want to
+     * simplify this later or just remove it. */
+    if (total_bytes <= FDSINK_MAX_MALLOC_SIZE) {
+      gchar *mem, *p;
+
+      if (total_bytes <= FDSINK_MAX_ALLOCA_SIZE)
+        mem = g_alloca (total_bytes);
+      else
+        mem = g_malloc (total_bytes);
+
+      p = mem;
+      for (i = 0; i < iovcnt; ++i) {
+        memcpy (p, iov[i].iov_base, iov[i].iov_len);
+        p += iov[i].iov_len;
+      }
+
+      do {
+        written = write (fd, mem, total_bytes);
+      } while (written < 0 && errno == EINTR);
+
+      if (total_bytes > FDSINK_MAX_ALLOCA_SIZE)
+        g_free (mem);
+    } else {
+      gssize ret;
+
+      written = 0;
+      for (i = 0; i < iovcnt; ++i) {
+        do {
+          ret = write (fd, iov[i].iov_base, iov[i].iov_len);
+        } while (ret < 0 && errno == EINTR);
+        if (ret > 0)
+          written += ret;
+        if (ret != iov[i].iov_len)
+          break;
+      }
+    }
+  }
+
+  return written;
+}
+
+static gsize
+fill_vectors (struct iovec *vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
+{
+  GstMemory *mem;
+  gsize size = 0;
+  guint i;
+
+  g_assert (gst_buffer_n_memory (buf) == n);
+
+  for (i = 0; i < n; ++i) {
+    mem = gst_buffer_peek_memory (buf, i);
+    if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
+      vecs[i].iov_base = maps[i].data;
+      vecs[i].iov_len = maps[i].size;
+    } else {
+      GST_WARNING ("Failed to map memory %p for reading", mem);
+      vecs[i].iov_base = (void *) "";
+      vecs[i].iov_len = 0;
+    }
+    size += vecs[i].iov_len;
+  }
+
+  return size;
+}
+
+GstFlowReturn
+gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
+    GstBuffer ** buffers, guint num_buffers, guint8 * mem_nums,
+    guint total_mem_num, guint64 * total_written, guint64 * cur_pos)
+{
+  struct iovec *vecs;
+  GstMapInfo *map_infos;
+  GstFlowReturn flow_ret;
+  gsize size = 0;
+  guint i, j;
+
+  GST_LOG_OBJECT (sink, "%u buffers, %u memories", num_buffers, total_mem_num);
+
+  vecs = g_newa (struct iovec, total_mem_num);
+  map_infos = g_newa (GstMapInfo, total_mem_num);
+
+  /* populate output vectors */
+  for (i = 0, j = 0; i < num_buffers; ++i) {
+    size += fill_vectors (&vecs[j], &map_infos[j], mem_nums[i], buffers[i]);
+    j += mem_nums[i];
+  }
+
+  /* now write it all out! */
+  {
+    gssize ret, left;
+    guint n_vecs = total_mem_num;
+
+    left = size;
+    do {
+#ifndef HAVE_WIN32
+      if (fdset != NULL) {
+        do {
+          GST_DEBUG_OBJECT (sink, "going into select, have %" G_GSSIZE_FORMAT
+              " bytes to write", left);
+          ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE);
+        } while (ret == -1 && (errno == EINTR || errno == EAGAIN));
+
+        if (ret == -1) {
+          if (errno == EBUSY)
+            goto stopped;
+          else
+            goto select_error;
+        }
+      }
+#endif
+
+      ret = gst_writev (fd, vecs, n_vecs, left);
+
+      if (ret > 0) {
+        if (total_written)
+          *total_written += ret;
+        if (cur_pos)
+          *cur_pos += ret;
+      }
+
+      if (ret == left)
+        break;
+
+      if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+        /* do nothing, try again */
+      } else if (ret < 0) {
+        goto write_error;
+      } else if (ret < left) {
+        /* skip vectors that have been written in full */
+        while (ret >= vecs[0].iov_len) {
+          ret -= vecs[0].iov_len;
+          left -= vecs[0].iov_len;
+          ++vecs;
+          --n_vecs;
+        }
+        g_assert (n_vecs > 0);
+        /* skip partially written vector data */
+        if (ret > 0) {
+          vecs[0].iov_len -= ret;
+          vecs[0].iov_base = ((guint8 *) vecs[0].iov_base) + ret;
+          left -= ret;
+        }
+      }
+#ifdef HAVE_WIN32
+      /* do short sleep on windows where we don't use gst_poll(),
+       * to avoid excessive busy looping */
+      if (fdset != NULL)
+        g_usleep (1000);
+#endif
+
+    }
+    while (left > 0);
+  }
+
+  flow_ret = GST_FLOW_OK;
+
+out:
+
+  for (i = 0; i < total_mem_num; ++i)
+    gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
+
+  return flow_ret;
+
+/* ERRORS */
+#ifndef HAVE_WIN32
+select_error:
+  {
+    GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
+        ("select on file descriptor: %s", g_strerror (errno)));
+    GST_DEBUG_OBJECT (sink, "Error during select: %s", g_strerror (errno));
+    flow_ret = GST_FLOW_ERROR;
+    goto out;
+  }
+stopped:
+  {
+    GST_DEBUG_OBJECT (sink, "Select stopped");
+    flow_ret = GST_FLOW_FLUSHING;
+    goto out;
+  }
+#endif
+write_error:
+  {
+    switch (errno) {
+      case ENOSPC:
+        GST_ELEMENT_ERROR (sink, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
+        break;
+      default:{
+        GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
+            ("Error while writing to file descriptor %d: %s",
+                fd, g_strerror (errno)));
+      }
+    }
+    flow_ret = GST_FLOW_ERROR;
+    goto out;
+  }
+}
index 9ccae1b..469b3b8 100644 (file)
@@ -30,6 +30,12 @@ G_BEGIN_DECLS
 G_GNUC_INTERNAL
 char *    gst_buffer_get_flags_string                   (GstBuffer *buffer);
 
+G_GNUC_INTERNAL
+GstFlowReturn  gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
+                                   GstBuffer ** buffers, guint num_buffers,
+                                   guint8 * mem_nums, guint total_mem_num,
+                                   guint64 * total_written, guint64 * cur_pos);
+
 G_END_DECLS
 
 #endif /* __GST_ELEMENTS_PRIVATE_H__ */