queue: Ignore thresholds if a query is queued
[platform/upstream/gstreamer.git] / plugins / elements / gstfdsink.c
index 80e1f30..ef24800 100644 (file)
  *
  * You should have received a copy of the GNU Library General Public
  * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
  */
 
 /**
  * SECTION:element-fdsink
+ * @title: fdsink
  * @see_also: #GstFdSrc
  *
  * Write data to a unix file descriptor.
@@ -29,8 +30,6 @@
  * This element will synchronize on the clock before writing the data on the
  * socket. For file descriptors where this does not make sense (files, ...) the
  * #GstBaseSink:sync property can be used to disable synchronisation.
- *
- * Last reviewed on 2006-04-28 (0.10.6)
  */
 
 #ifdef HAVE_CONFIG_H
 
 #include <sys/types.h>
 
-#ifdef G_OS_WIN32
-#include <io.h>                 /* lseek, open, close, read */
-#undef lseek
-#define lseek _lseeki64
-#undef off_t
-#define off_t guint64
-#endif
-
 #include <sys/stat.h>
 #ifdef HAVE_SYS_SOCKET_H
 #include <sys/socket.h>
 #include <string.h>
 
 #include "gstfdsink.h"
+#include "gstelements_private.h"
+
+#ifdef G_OS_WIN32
+#include <io.h>                 /* lseek, open, close, read */
+#undef lseek
+#define lseek _lseeki64
+#undef off_t
+#define off_t guint64
+#endif
+
+#ifdef __BIONIC__               /* Android */
+#undef lseek
+#define lseek lseek64
+#undef fstat
+#define fstat fstat64
+#undef off_t
+#define off_t guint64
+#endif
 
 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_SINK,
@@ -94,23 +103,11 @@ enum
 static void gst_fd_sink_uri_handler_init (gpointer g_iface,
     gpointer iface_data);
 
-static void
-_do_init (GType gst_fd_sink_type)
-{
-  static const GInterfaceInfo urihandler_info = {
-    gst_fd_sink_uri_handler_init,
-    NULL,
-    NULL
-  };
-
-  g_type_add_interface_static (gst_fd_sink_type, GST_TYPE_URI_HANDLER,
-      &urihandler_info);
-
+#define _do_init \
+  G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_fd_sink_uri_handler_init); \
   GST_DEBUG_CATEGORY_INIT (gst_fd_sink__debug, "fdsink", 0, "fdsink element");
-}
-
-GST_BOILERPLATE_FULL (GstFdSink, gst_fd_sink, GstBaseSink, GST_TYPE_BASE_SINK,
-    _do_init);
+#define gst_fd_sink_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstFdSink, gst_fd_sink, GST_TYPE_BASE_SINK, _do_init);
 
 static void gst_fd_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
@@ -118,9 +115,11 @@ static void gst_fd_sink_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
 static void gst_fd_sink_dispose (GObject * obj);
 
-static gboolean gst_fd_sink_query (GstPad * pad, GstQuery * query);
+static gboolean gst_fd_sink_query (GstBaseSink * bsink, GstQuery * query);
 static GstFlowReturn gst_fd_sink_render (GstBaseSink * sink,
     GstBuffer * buffer);
+static GstFlowReturn gst_fd_sink_render_list (GstBaseSink * bsink,
+    GstBufferList * buffer_list);
 static gboolean gst_fd_sink_start (GstBaseSink * basesink);
 static gboolean gst_fd_sink_stop (GstBaseSink * basesink);
 static gboolean gst_fd_sink_unlock (GstBaseSink * basesink);
@@ -130,37 +129,34 @@ static gboolean gst_fd_sink_event (GstBaseSink * sink, GstEvent * event);
 static gboolean gst_fd_sink_do_seek (GstFdSink * fdsink, guint64 new_offset);
 
 static void
-gst_fd_sink_base_init (gpointer g_class)
-{
-  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
-
-  gst_element_class_set_details_simple (gstelement_class,
-      "Filedescriptor Sink",
-      "Sink/File",
-      "Write data to a file descriptor", "Erik Walthinsen <omega@cse.ogi.edu>");
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sinktemplate));
-}
-
-static void
 gst_fd_sink_class_init (GstFdSinkClass * klass)
 {
   GObjectClass *gobject_class;
+  GstElementClass *gstelement_class;
   GstBaseSinkClass *gstbasesink_class;
 
   gobject_class = G_OBJECT_CLASS (klass);
+  gstelement_class = GST_ELEMENT_CLASS (klass);
   gstbasesink_class = GST_BASE_SINK_CLASS (klass);
 
   gobject_class->set_property = gst_fd_sink_set_property;
   gobject_class->get_property = gst_fd_sink_get_property;
   gobject_class->dispose = gst_fd_sink_dispose;
 
+  gst_element_class_set_static_metadata (gstelement_class,
+      "Filedescriptor Sink",
+      "Sink/File",
+      "Write data to a file descriptor", "Erik Walthinsen <omega@cse.ogi.edu>");
+  gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
+
   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_fd_sink_render);
+  gstbasesink_class->render_list = GST_DEBUG_FUNCPTR (gst_fd_sink_render_list);
   gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_fd_sink_start);
   gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_fd_sink_stop);
   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_fd_sink_unlock);
   gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_fd_sink_unlock_stop);
   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_fd_sink_event);
+  gstbasesink_class->query = GST_DEBUG_FUNCPTR (gst_fd_sink_query);
 
   g_object_class_install_property (gobject_class, ARG_FD,
       g_param_spec_int ("fd", "fd", "An open file descriptor to write to",
@@ -168,13 +164,8 @@ gst_fd_sink_class_init (GstFdSinkClass * klass)
 }
 
 static void
-gst_fd_sink_init (GstFdSink * fdsink, GstFdSinkClass * klass)
+gst_fd_sink_init (GstFdSink * fdsink)
 {
-  GstPad *pad;
-
-  pad = GST_BASE_SINK_PAD (fdsink);
-  gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_fd_sink_query));
-
   fdsink->fd = 1;
   fdsink->uri = g_strdup_printf ("fd://%d", fdsink->fd);
   fdsink->bytes_written = 0;
@@ -195,34 +186,42 @@ gst_fd_sink_dispose (GObject * obj)
 }
 
 static gboolean
-gst_fd_sink_query (GstPad * pad, GstQuery * query)
+gst_fd_sink_query (GstBaseSink * bsink, GstQuery * query)
 {
+  gboolean res = FALSE;
   GstFdSink *fdsink;
-  GstFormat format;
 
-  fdsink = GST_FD_SINK (GST_PAD_PARENT (pad));
+  fdsink = GST_FD_SINK (bsink);
 
   switch (GST_QUERY_TYPE (query)) {
     case GST_QUERY_POSITION:
+    {
+      GstFormat format;
+
       gst_query_parse_position (query, &format, NULL);
+
       switch (format) {
         case GST_FORMAT_DEFAULT:
         case GST_FORMAT_BYTES:
           gst_query_set_position (query, GST_FORMAT_BYTES, fdsink->current_pos);
-          return TRUE;
+          res = TRUE;
+          break;
         default:
-          return FALSE;
+          break;
       }
-
+      break;
+    }
     case GST_QUERY_FORMATS:
       gst_query_set_formats (query, 2, GST_FORMAT_DEFAULT, GST_FORMAT_BYTES);
-      return TRUE;
-
+      res = TRUE;
+      break;
     case GST_QUERY_URI:
       gst_query_set_uri (query, fdsink->uri);
-      return TRUE;
+      res = TRUE;
+      break;
+    case GST_QUERY_SEEKING:{
+      GstFormat format;
 
-    case GST_QUERY_SEEKING:
       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
       if (format == GST_FORMAT_BYTES || format == GST_FORMAT_DEFAULT) {
         gst_query_set_seeking (query, GST_FORMAT_BYTES, fdsink->seekable, 0,
@@ -230,110 +229,104 @@ gst_fd_sink_query (GstPad * pad, GstQuery * query)
       } else {
         gst_query_set_seeking (query, format, FALSE, 0, -1);
       }
-      return TRUE;
-
+      res = TRUE;
+      break;
+    }
     default:
-      return gst_pad_query_default (pad, query);
+      res = GST_BASE_SINK_CLASS (parent_class)->query (bsink, query);
+      break;
+
   }
+  return res;
 }
 
 static GstFlowReturn
-gst_fd_sink_render (GstBaseSink * sink, GstBuffer * buffer)
+gst_fd_sink_render_buffers (GstFdSink * sink, GstBuffer ** buffers,
+    guint num_buffers, guint8 * mem_nums, guint total_mems)
 {
-  GstFdSink *fdsink;
-  guint8 *data;
-  guint size;
-  gint written;
-
-#ifndef HAVE_WIN32
-  gint retval;
-#endif
+  GstFlowReturn ret;
+  guint64 skip = 0;
 
-  fdsink = GST_FD_SINK (sink);
+  for (;;) {
+    guint64 bytes_written = 0;
 
-  g_return_val_if_fail (fdsink->fd >= 0, GST_FLOW_ERROR);
+    ret = gst_writev_buffers (GST_OBJECT_CAST (sink), sink->fd, sink->fdset,
+        buffers, num_buffers, mem_nums, total_mems, &bytes_written, skip);
 
-  data = GST_BUFFER_DATA (buffer);
-  size = GST_BUFFER_SIZE (buffer);
+    sink->bytes_written += bytes_written;
+    sink->current_pos += bytes_written;
+    skip += bytes_written;
 
-again:
-#ifndef HAVE_WIN32
-  do {
-    GST_DEBUG_OBJECT (fdsink, "going into select, have %d bytes to write",
-        size);
-    retval = gst_poll_wait (fdsink->fdset, GST_CLOCK_TIME_NONE);
-  } while (retval == -1 && (errno == EINTR || errno == EAGAIN));
+    if (!sink->unlock)
+      break;
 
-  if (retval == -1) {
-    if (errno == EBUSY)
-      goto stopped;
-    else
-      goto select_error;
+    ret = gst_base_sink_wait_preroll (GST_BASE_SINK (sink));
+    if (ret != GST_FLOW_OK)
+      return ret;
   }
-#endif
 
-  GST_DEBUG_OBJECT (fdsink, "writing %d bytes to file descriptor %d", size,
-      fdsink->fd);
+  return ret;
+}
 
-  written = write (fdsink->fd, data, size);
+static GstFlowReturn
+gst_fd_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
+{
+  GstFlowReturn flow;
+  GstBuffer **buffers;
+  GstFdSink *sink;
+  guint8 *mem_nums;
+  guint total_mems;
+  guint i, num_buffers;
+
+  sink = GST_FD_SINK_CAST (bsink);
+
+  num_buffers = gst_buffer_list_length (buffer_list);
+  if (num_buffers == 0)
+    goto no_data;
+
+  /* extract buffers from list and count memories */
+  buffers = g_newa (GstBuffer *, num_buffers);
+  mem_nums = g_newa (guint8, num_buffers);
+  for (i = 0, total_mems = 0; i < num_buffers; ++i) {
+    buffers[i] = gst_buffer_list_get (buffer_list, i);
+    mem_nums[i] = gst_buffer_n_memory (buffers[i]);
+    total_mems += mem_nums[i];
+  }
 
-  /* check for errors */
-  if (G_UNLIKELY (written < 0)) {
-    /* try to write again on non-fatal errors */
-    if (errno == EAGAIN || errno == EINTR)
-      goto again;
+  flow =
+      gst_fd_sink_render_buffers (sink, buffers, num_buffers, mem_nums,
+      total_mems);
 
-    /* else go to our error handler */
-    goto write_error;
-  }
+  return flow;
 
-  /* all is fine when we get here */
-  size -= written;
-  data += written;
-  fdsink->bytes_written += written;
-  fdsink->current_pos += written;
+no_data:
+  {
+    GST_LOG_OBJECT (sink, "empty buffer list");
+    return GST_FLOW_OK;
+  }
+}
 
-  GST_DEBUG_OBJECT (fdsink, "wrote %d bytes, %d left", written, size);
+static GstFlowReturn
+gst_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
+{
+  GstFlowReturn flow;
+  GstFdSink *sink;
+  guint8 n_mem;
 
-  /* short write, select and try to write the remainder */
-  if (G_UNLIKELY (size > 0))
-    goto again;
+  sink = GST_FD_SINK_CAST (bsink);
 
-  return GST_FLOW_OK;
+  n_mem = gst_buffer_n_memory (buffer);
 
-#ifndef HAVE_WIN32
-select_error:
-  {
-    GST_ELEMENT_ERROR (fdsink, RESOURCE, READ, (NULL),
-        ("select on file descriptor: %s.", g_strerror (errno)));
-    GST_DEBUG_OBJECT (fdsink, "Error during select");
-    return GST_FLOW_ERROR;
-  }
-stopped:
-  {
-    GST_DEBUG_OBJECT (fdsink, "Select stopped");
-    return GST_FLOW_WRONG_STATE;
-  }
-#endif
+  if (n_mem > 0)
+    flow = gst_fd_sink_render_buffers (sink, &buffer, 1, &n_mem, n_mem);
+  else
+    flow = GST_FLOW_OK;
 
-write_error:
-  {
-    switch (errno) {
-      case ENOSPC:
-        GST_ELEMENT_ERROR (fdsink, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
-        break;
-      default:{
-        GST_ELEMENT_ERROR (fdsink, RESOURCE, WRITE, (NULL),
-            ("Error while writing to file descriptor %d: %s",
-                fdsink->fd, g_strerror (errno)));
-      }
-    }
-    return GST_FLOW_ERROR;
-  }
+  return flow;
 }
 
 static gboolean
-gst_fd_sink_check_fd (GstFdSink * fdsink, int fd)
+gst_fd_sink_check_fd (GstFdSink * fdsink, int fd, GError ** error)
 {
   struct stat stat_results;
   off_t result;
@@ -365,6 +358,8 @@ invalid:
   {
     GST_ELEMENT_ERROR (fdsink, RESOURCE, WRITE, (NULL),
         ("File descriptor %d is not valid: %s", fd, g_strerror (errno)));
+    g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE,
+        "File descriptor %d is not valid: %s", fd, g_strerror (errno));
     return FALSE;
   }
 not_seekable:
@@ -381,7 +376,7 @@ gst_fd_sink_start (GstBaseSink * basesink)
   GstPollFD fd = GST_POLL_FD_INIT;
 
   fdsink = GST_FD_SINK (basesink);
-  if (!gst_fd_sink_check_fd (fdsink, fdsink->fd))
+  if (!gst_fd_sink_check_fd (fdsink, fdsink->fd, NULL))
     return FALSE;
 
   if ((fdsink->fdset = gst_poll_new (TRUE)) == NULL)
@@ -428,6 +423,7 @@ gst_fd_sink_unlock (GstBaseSink * basesink)
 
   GST_LOG_OBJECT (fdsink, "Flushing");
   GST_OBJECT_LOCK (fdsink);
+  fdsink->unlock = TRUE;
   gst_poll_set_flushing (fdsink->fdset, TRUE);
   GST_OBJECT_UNLOCK (fdsink);
 
@@ -441,6 +437,7 @@ gst_fd_sink_unlock_stop (GstBaseSink * basesink)
 
   GST_LOG_OBJECT (fdsink, "No longer flushing");
   GST_OBJECT_LOCK (fdsink);
+  fdsink->unlock = FALSE;
   gst_poll_set_flushing (fdsink->fdset, FALSE);
   GST_OBJECT_UNLOCK (fdsink);
 
@@ -448,12 +445,15 @@ gst_fd_sink_unlock_stop (GstBaseSink * basesink)
 }
 
 static gboolean
-gst_fd_sink_update_fd (GstFdSink * fdsink, int new_fd)
+gst_fd_sink_update_fd (GstFdSink * fdsink, int new_fd, GError ** error)
 {
-  if (new_fd < 0)
+  if (new_fd < 0) {
+    g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE,
+        "File descriptor %d is not valid", new_fd);
     return FALSE;
+  }
 
-  if (!gst_fd_sink_check_fd (fdsink, new_fd))
+  if (!gst_fd_sink_check_fd (fdsink, new_fd, error))
     goto invalid;
 
   /* assign the fd */
@@ -495,7 +495,7 @@ gst_fd_sink_set_property (GObject * object, guint prop_id,
       int fd;
 
       fd = g_value_get_int (value);
-      gst_fd_sink_update_fd (fdsink, fd);
+      gst_fd_sink_update_fd (fdsink, fd, NULL);
       break;
     }
     default:
@@ -559,28 +559,27 @@ gst_fd_sink_event (GstBaseSink * sink, GstEvent * event)
   type = GST_EVENT_TYPE (event);
 
   switch (type) {
-    case GST_EVENT_NEWSEGMENT:
+    case GST_EVENT_SEGMENT:
     {
-      gint64 start, stop, pos;
-      GstFormat format;
-      gst_event_parse_new_segment (event, NULL, NULL, &format, &start,
-          &stop, &pos);
+      const GstSegment *segment;
+
+      gst_event_parse_segment (event, &segment);
 
-      if (format == GST_FORMAT_BYTES) {
+      if (segment->format == GST_FORMAT_BYTES) {
         /* only try to seek and fail when we are going to a different
          * position */
-        if (fdsink->current_pos != start) {
+        if (fdsink->current_pos != segment->start) {
           /* FIXME, the seek should be performed on the pos field, start/stop are
            * just boundaries for valid bytes offsets. We should also fill the file
            * with zeroes if the new position extends the current EOF (sparse streams
            * and segment accumulation). */
-          if (!gst_fd_sink_do_seek (fdsink, (guint64) start))
+          if (!gst_fd_sink_do_seek (fdsink, (guint64) segment->start))
             goto seek_failed;
         }
       } else {
         GST_DEBUG_OBJECT (fdsink,
-            "Ignored NEWSEGMENT event of format %u (%s)", (guint) format,
-            gst_format_get_name (format));
+            "Ignored SEGMENT event of format %u (%s)", (guint) segment->format,
+            gst_format_get_name (segment->format));
       }
       break;
     }
@@ -588,13 +587,14 @@ gst_fd_sink_event (GstBaseSink * sink, GstEvent * event)
       break;
   }
 
-  return TRUE;
+  return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
 
 seek_failed:
   {
     GST_ELEMENT_ERROR (fdsink, RESOURCE, SEEK, (NULL),
         ("Error while seeking on file descriptor %d: %s",
             fdsink->fd, g_strerror (errno)));
+    gst_event_unref (event);
     return FALSE;
   }
 
@@ -603,45 +603,42 @@ seek_failed:
 /*** GSTURIHANDLER INTERFACE *************************************************/
 
 static GstURIType
-gst_fd_sink_uri_get_type (void)
+gst_fd_sink_uri_get_type (GType type)
 {
   return GST_URI_SINK;
 }
 
-static gchar **
-gst_fd_sink_uri_get_protocols (void)
+static const gchar *const *
+gst_fd_sink_uri_get_protocols (GType type)
 {
-  static gchar *protocols[] = { (char *) "fd", NULL };
+  static const gchar *protocols[] = { "fd", NULL };
 
   return protocols;
 }
 
-static const gchar *
+static gchar *
 gst_fd_sink_uri_get_uri (GstURIHandler * handler)
 {
   GstFdSink *sink = GST_FD_SINK (handler);
 
-  return sink->uri;
+  /* FIXME: make thread-safe */
+  return g_strdup (sink->uri);
 }
 
 static gboolean
-gst_fd_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri)
+gst_fd_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri,
+    GError ** error)
 {
-  gchar *protocol;
   GstFdSink *sink = GST_FD_SINK (handler);
   gint fd;
 
-  protocol = gst_uri_get_protocol (uri);
-  if (strcmp (protocol, "fd") != 0) {
-    g_free (protocol);
+  if (sscanf (uri, "fd://%d", &fd) != 1) {
+    g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
+        "File descriptor URI could not be parsed");
     return FALSE;
   }
-  g_free (protocol);
-
-  if (sscanf (uri, "fd://%d", &fd) != 1)
-    return FALSE;
 
-  return gst_fd_sink_update_fd (sink, fd);
+  return gst_fd_sink_update_fd (sink, fd, error);
 }
 
 static void