plugins/elements/:
authorPhilippe Rouquier <bonfire-app@wanadoo.fr>
Mon, 30 Jan 2006 21:57:00 +0000 (21:57 +0000)
committerJan Schmidt <thaytan@mad.scientist.com>
Mon, 30 Jan 2006 21:57:00 +0000 (21:57 +0000)
Original commit message from CVS:
* plugins/elements/Makefile.am:
* plugins/elements/gstelements.c:
* plugins/elements/gstfdsink.c: (_do_init),
(gst_fd_sink_base_init), (gst_fd_sink_class_init),
(gst_fd_sink_init), (gst_fd_sink_dispose), (gst_fd_sink_query),
(gst_fd_sink_render), (gst_fd_sink_check_fd), (gst_fd_sink_start),
(gst_fd_sink_stop), (gst_fd_sink_unlock), (gst_fd_sink_update_fd),
(gst_fd_sink_set_property), (gst_fd_sink_uri_get_type),
(gst_fd_sink_uri_get_protocols), (gst_fd_sink_uri_get_uri),
(gst_fd_sink_uri_set_uri), (gst_fd_sink_uri_handler_init):
* plugins/elements/gstfdsink.h:
Port fdsink to 0.10 (patch by Philippe Rouquier) (Fixes #325490)

ChangeLog
plugins/elements/Makefile.am
plugins/elements/gstelements.c
plugins/elements/gstfdsink.c
plugins/elements/gstfdsink.h

index e701deb..8ad943b 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,18 @@
+2006-01-30  Jan Schmidt  <thaytan@mad.scientist.com>
+
+       * plugins/elements/Makefile.am:
+       * plugins/elements/gstelements.c:
+       * plugins/elements/gstfdsink.c: (_do_init),
+       (gst_fd_sink_base_init), (gst_fd_sink_class_init),
+       (gst_fd_sink_init), (gst_fd_sink_dispose), (gst_fd_sink_query),
+       (gst_fd_sink_render), (gst_fd_sink_check_fd), (gst_fd_sink_start),
+       (gst_fd_sink_stop), (gst_fd_sink_unlock), (gst_fd_sink_update_fd),
+       (gst_fd_sink_set_property), (gst_fd_sink_uri_get_type),
+       (gst_fd_sink_uri_get_protocols), (gst_fd_sink_uri_get_uri),
+       (gst_fd_sink_uri_set_uri), (gst_fd_sink_uri_handler_init):
+       * plugins/elements/gstfdsink.h:
+       Port fdsink to 0.10 (patch by Philippe Rouquier) (Fixes #325490)
+
 2006-01-30  Stefan Kost  <ensonic@users.sf.net>
 
        * docs/manual/advanced-dparams.xml:
index b68d29d..07fe6e0 100644 (file)
@@ -8,8 +8,10 @@ plugin_LTLIBRARIES = libgstcoreelements.la
 
 if HAVE_SYS_SOCKET_H
 GSTFDSRC = gstfdsrc.c
+GSTFDSINK = gstfdsink.c
 else
 GSTFDSRC =
+GSTFDSINK = 
 endif
 
 libgstcoreelements_la_DEPENDENCIES = $(top_builddir)/gst/libgstreamer-@GST_MAJORMINOR@.la
@@ -20,6 +22,7 @@ libgstcoreelements_la_SOURCES =       \
        gstfakesrc.c            \
        gstfakesink.c           \
        $(GSTFDSRC)             \
+       $(GSTFDSINK)            \
        gstfilesink.c           \
        gstfilesrc.c            \
        gstidentity.c           \
@@ -38,6 +41,7 @@ noinst_HEADERS =              \
        gstfakesink.h           \
        gstfakesrc.h            \
        gstfdsrc.h              \
+       gstfdsink.h             \
        gstfilesink.h           \
        gstfilesrc.h            \
        gstidentity.h           \
@@ -45,4 +49,5 @@ noinst_HEADERS =              \
        gsttee.h                \
        gsttypefindelement.h
 
-EXTRA_DIST = gstfdsrc.c
+EXTRA_DIST = gstfdsrc.c \
+            gstfdsink.c
index 2ae4e0f..b70aa9e 100644 (file)
@@ -30,6 +30,7 @@
 #include "gstfakesink.h"
 #include "gstfakesrc.h"
 #include "gstfdsrc.h"
+#include "gstfdsink.h"
 #include "gstfilesink.h"
 #include "gstfilesrc.h"
 #include "gstidentity.h"
@@ -54,6 +55,7 @@ static struct _elements_entry _elements[] = {
   {"fakesink", GST_RANK_NONE, gst_fake_sink_get_type},
 #if HAVE_SYS_SOCKET_H
   {"fdsrc", GST_RANK_NONE, gst_fd_src_get_type},
+  {"fdsink", GST_RANK_NONE, gst_fd_sink_get_type},
 #endif
   {"filesrc", GST_RANK_NONE, gst_file_src_get_type},
   {"identity", GST_RANK_NONE, gst_identity_get_type},
index da88c85..8113ca3 100644 (file)
 #  include "config.h"
 #endif
 
-#include "gstfdsink.h"
+#include "../../gst/gst-i18n-lib.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <stdio.h>
 #ifdef HAVE_UNISTD_H
 #include <unistd.h>
 #endif
+#ifdef _MSC_VER
+#include <io.h>
+#endif
+#include <errno.h>
+#include <string.h>
+
+#include "gstfdsink.h"
+
+/* We add a control socket as in fdsrc to make it shutdown quickly when it's blocking on the fd.
+ * Select is used to determine when the fd is ready for use. When the element state is changed,
+ * it happens from another thread while fdsink is select'ing on the fd. The state-change thread 
+ * sends a control message, so fdsink wakes up and changes state immediately otherwise
+ * it would stay blocked until it receives some data. */
+
+/* the select call is also performed on the control sockets, that way
+ * we can send special commands to unblock the select call */
+#define CONTROL_STOP            'S'     /* stop the select call */
+#define CONTROL_SOCKETS(sink)   sink->control_sock
+#define WRITE_SOCKET(sink)      sink->control_sock[1]
+#define READ_SOCKET(sink)       sink->control_sock[0]
+
+#define SEND_COMMAND(sink, command)          \
+G_STMT_START {                              \
+  unsigned char c; c = command;             \
+  write (WRITE_SOCKET(sink), &c, 1);         \
+} G_STMT_END
+
+#define READ_COMMAND(sink, command, res)        \
+G_STMT_START {                                 \
+  res = read(READ_SOCKET(sink), &command, 1);   \
+} G_STMT_END
 
 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_SINK,
@@ -65,23 +102,42 @@ enum
   ARG_FD
 };
 
+static void gst_fd_sink_uri_handler_init (gpointer g_iface,
+    gpointer iface_data);
 
-#define _do_init(bla) \
-    GST_DEBUG_CATEGORY_INIT (gst_fd_sink__debug, "fdsink", 0, "fdsink element");
+static void
+_do_init (GType gst_fd_sink_type)
+{
+  static const GInterfaceInfo urihandler_info = {
+    gst_fd_sink_uri_handler_init,
+    NULL,
+    NULL
+  };
+
+  g_type_add_interface_static (gst_fd_sink_type, GST_TYPE_URI_HANDLER,
+      &urihandler_info);
 
-GST_BOILERPLATE_FULL (GstFdSink, gst_fd_sink_, GstElement, GST_TYPE_ELEMENT,
+  GST_DEBUG_CATEGORY_INIT (gst_fd_sink__debug, "fdsink", 0, "fdsink element");
+}
+
+GST_BOILERPLATE_FULL (GstFdSink, gst_fd_sink, GstBaseSink, GST_TYPE_BASE_SINK,
     _do_init);
 
-static void gst_fd_sink__set_property (GObject * object, guint prop_id,
+static void gst_fd_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
 static void gst_fd_sink_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
+static void gst_fd_sink_dispose (GObject * obj);
 
-static void gst_fd_sink__chain (GstPad * pad, GstData * _data);
-
+static gboolean gst_fd_sink_query (GstPad * pad, GstQuery * query);
+static GstFlowReturn gst_fd_sink_render (GstBaseSink * sink,
+    GstBuffer * buffer);
+static gboolean gst_fd_sink_start (GstBaseSink * basesink);
+static gboolean gst_fd_sink_stop (GstBaseSink * basesink);
+static gboolean gst_fd_sink_unlock (GstBaseSink * basesink);
 
 static void
-gst_fd_sink__base_init (gpointer g_class)
+gst_fd_sink_base_init (gpointer g_class)
 {
   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
 
@@ -89,15 +145,26 @@ gst_fd_sink__base_init (gpointer g_class)
       gst_static_pad_template_get (&sinktemplate));
   gst_element_class_set_details (gstelement_class, &gst_fd_sink__details);
 }
+
 static void
-gst_fd_sink__class_init (GstFdSinkClass * klass)
+gst_fd_sink_class_init (GstFdSinkClass * klass)
 {
   GObjectClass *gobject_class;
+  GstBaseSinkClass *gstbasesink_class;
 
   gobject_class = G_OBJECT_CLASS (klass);
+  gstbasesink_class = GST_BASE_SINK_CLASS (klass);
 
-  gobject_class->set_property = gst_fd_sink__set_property;
+  gobject_class->set_property = gst_fd_sink_set_property;
   gobject_class->get_property = gst_fd_sink_get_property;
+  gobject_class->dispose = gst_fd_sink_dispose;
+
+  gstbasesink_class->get_times = NULL;
+  gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_fd_sink_render);
+  gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_fd_sink_start);
+  gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_fd_sink_stop);
+  gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_fd_sink_unlock);
+  gstbasesink_class->event = NULL;
 
   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_FD,
       g_param_spec_int ("fd", "fd", "An open file descriptor to write to",
@@ -105,42 +172,255 @@ gst_fd_sink__class_init (GstFdSinkClass * klass)
 }
 
 static void
-gst_fd_sink__init (GstFdSink * fdsink)
+gst_fd_sink_init (GstFdSink * fdsink, GstFdSinkClass * klass)
 {
-  fdsink->sinkpad =
-      gst_pad_new_from_template (gst_static_pad_template_get (&sinktemplate),
-      "sink");
-  gst_element_add_pad (GST_ELEMENT (fdsink), fdsink->sinkpad);
-  gst_pad_set_chain_function (fdsink->sinkpad, gst_fd_sink__chain);
+  GstPad *pad;
+
+  pad = GST_BASE_SINK_PAD (fdsink);
+  gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_fd_sink_query));
 
   fdsink->fd = 1;
+  fdsink->uri = g_strdup_printf ("fd://%d", fdsink->fd);
+  fdsink->bytes_written = 0;
+
+  GST_BASE_SINK (fdsink)->sync = TRUE;
 }
 
 static void
-gst_fd_sink__chain (GstPad * pad, GstData * _data)
+gst_fd_sink_dispose (GObject * obj)
+{
+  GstFdSink *fdsink = GST_FD_SINK (obj);
+
+  g_free (fdsink->uri);
+  fdsink->uri = NULL;
+
+  G_OBJECT_CLASS (parent_class)->dispose (obj);
+}
+
+static gboolean
+gst_fd_sink_query (GstPad * pad, GstQuery * query)
+{
+  GstFdSink *fdsink;
+  GstFormat format;
+
+  fdsink = GST_FD_SINK (GST_PAD_PARENT (pad));
+
+  switch (GST_QUERY_TYPE (query)) {
+    case GST_QUERY_POSITION:
+      gst_query_parse_position (query, &format, NULL);
+      switch (format) {
+        case GST_FORMAT_DEFAULT:
+        case GST_FORMAT_BYTES:
+          gst_query_set_position (query, GST_FORMAT_BYTES,
+              fdsink->bytes_written);
+          return TRUE;
+        default:
+          return FALSE;
+      }
+
+    case GST_QUERY_FORMATS:
+      gst_query_set_formats (query, 2, GST_FORMAT_DEFAULT, GST_FORMAT_BYTES);
+      return TRUE;
+
+    default:
+      return gst_pad_query_default (pad, query);
+  }
+}
+
+static GstFlowReturn
+gst_fd_sink_render (GstBaseSink * sink, GstBuffer * buffer)
+{
+  GstFdSink *fdsink;
+
+#ifndef HAVE_WIN32
+  fd_set readfds;
+  fd_set writefds;
+  gint retval;
+#endif
+
+  fdsink = GST_FD_SINK (sink);
+
+  g_return_val_if_fail (fdsink->fd >= 0, GST_FLOW_ERROR);
+
+#ifndef HAVE_WIN32
+  FD_ZERO (&readfds);
+  FD_SET (READ_SOCKET (fdsink), &readfds);
+
+  FD_ZERO (&writefds);
+  FD_SET (fdsink->fd, &writefds);
+
+  do {
+    retval = select (FD_SETSIZE, &readfds, &writefds, NULL, NULL);
+  } while ((retval == -1 && errno == EINTR));
+
+  if (retval == -1)
+    goto select_error;
+
+  if (FD_ISSET (READ_SOCKET (fdsink), &readfds)) {
+    /* read all stop commands */
+    while (TRUE) {
+      gchar command;
+      int res;
+
+      READ_COMMAND (fdsink, command, res);
+      if (res < 0) {
+        GST_LOG_OBJECT (fdsink, "no more commands");
+        /* no more commands */
+        break;
+      }
+    }
+    goto stopped;
+  }
+#endif
+
+  if (GST_BUFFER_DATA (buffer)) {
+    guint bytes_written;
+
+    GST_DEBUG ("writing %d bytes to file descriptor %d",
+        GST_BUFFER_SIZE (buffer), fdsink->fd);
+    bytes_written =
+        write (fdsink->fd, GST_BUFFER_DATA (buffer), GST_BUFFER_SIZE (buffer));
+    fdsink->bytes_written += bytes_written;
+    if (bytes_written != GST_BUFFER_SIZE (buffer)) {
+      GST_ELEMENT_ERROR (fdsink, RESOURCE, WRITE,
+          (_("Error while writing to file descriptor \"%d\"."), fdsink->fd),
+          ("%s", g_strerror (errno)));
+      return GST_FLOW_ERROR;
+    }
+  }
+
+  return GST_FLOW_OK;
+
+#ifndef HAVE_WIN32
+select_error:
+  {
+    GST_ELEMENT_ERROR (fdsink, RESOURCE, READ, (NULL),
+        ("select on file descriptor: %s.", g_strerror (errno)));
+    GST_DEBUG_OBJECT (fdsink, "Error during select");
+    return GST_FLOW_ERROR;
+  }
+stopped:
+  {
+    GST_DEBUG_OBJECT (fdsink, "Select stopped");
+    return GST_FLOW_WRONG_STATE;
+  }
+#endif
+}
+
+static gboolean
+gst_fd_sink_check_fd (GstFdSink * fdsink, int fd)
+{
+  struct stat stat_results;
+  off_t result;
+
+  /* see that it is a valid file descriptor */
+  if (fstat (fd, &stat_results) < 0)
+    goto invalid;
+
+  if (!S_ISREG (stat_results.st_mode))
+    goto not_seekable;
+
+  /* see if it is a seekable stream */
+  result = lseek (fd, 0, SEEK_CUR);
+  if (result == -1) {
+    switch (errno) {
+      case EINVAL:
+      case EBADF:
+        goto invalid;
+
+      case ESPIPE:
+        goto not_seekable;
+    }
+  } else
+    GST_DEBUG ("File descriptor \"%d\" is seekable", fd);
+
+  return TRUE;
+
+invalid:
+  GST_ELEMENT_ERROR (fdsink, RESOURCE, WRITE,
+      (_("File descriptor \"%d\" is not valid."), fd),
+      ("%s", g_strerror (errno)));
+  return FALSE;
+
+not_seekable:
+  GST_DEBUG ("File descriptor \"%d\" is a pipe", fd);
+  return TRUE;
+}
+
+static gboolean
+gst_fd_sink_start (GstBaseSink * basesink)
 {
-  GstBuffer *buf = GST_BUFFER (_data);
   GstFdSink *fdsink;
+  gint control_sock[2];
+
+  fdsink = GST_FD_SINK (basesink);
+  if (!gst_fd_sink_check_fd (fdsink, fdsink->fd))
+    return FALSE;
+
+  if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
+    goto socket_pair;
+
+  READ_SOCKET (fdsink) = control_sock[0];
+  WRITE_SOCKET (fdsink) = control_sock[1];
+
+  fcntl (READ_SOCKET (fdsink), F_SETFL, O_NONBLOCK);
+  fcntl (WRITE_SOCKET (fdsink), F_SETFL, O_NONBLOCK);
+
+  return TRUE;
+
+  /* ERRORS */
+socket_pair:
+  {
+    GST_ELEMENT_ERROR (fdsink, RESOURCE, OPEN_READ_WRITE, (NULL),
+        GST_ERROR_SYSTEM);
+    return FALSE;
+  }
+}
+
+static gboolean
+gst_fd_sink_stop (GstBaseSink * basesink)
+{
+  GstFdSink *fdsink = GST_FD_SINK (basesink);
+
+  close (READ_SOCKET (fdsink));
+  close (WRITE_SOCKET (fdsink));
+
+  return TRUE;
+}
+
+static gboolean
+gst_fd_sink_unlock (GstBaseSink * basesink)
+{
+  GstFdSink *fdsink = GST_FD_SINK (basesink);
 
-  g_return_if_fail (pad != NULL);
-  g_return_if_fail (GST_IS_PAD (pad));
-  g_return_if_fail (buf != NULL);
+  SEND_COMMAND (fdsink, CONTROL_STOP);
 
-  fdsink = GST_FD_SINK (gst_pad_get_parent (pad));
+  return TRUE;
+}
+
+static gboolean
+gst_fd_sink_update_fd (GstFdSink * fdsink, int new_fd)
+{
+  if (new_fd < 0)
+    return FALSE;
 
-  g_return_if_fail (fdsink->fd >= 0);
+  GST_OBJECT_LOCK (fdsink);
 
-  if (GST_BUFFER_DATA (buf)) {
-    GST_DEBUG ("writing %d bytes to file descriptor %d", GST_BUFFER_SIZE (buf),
-        fdsink->fd);
-    write (fdsink->fd, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf));
+  if (!gst_fd_sink_check_fd (fdsink, new_fd)) {
+    GST_OBJECT_UNLOCK (fdsink);
+    return FALSE;
   }
 
-  gst_buffer_unref (buf);
+  fdsink->fd = new_fd;
+  g_free (fdsink->uri);
+  fdsink->uri = g_strdup_printf ("fd://%d", fdsink->fd);
+
+  GST_OBJECT_UNLOCK (fdsink);
+  return TRUE;
 }
 
 static void
-gst_fd_sink__set_property (GObject * object, guint prop_id,
+gst_fd_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)
 {
   GstFdSink *fdsink;
@@ -150,9 +430,13 @@ gst_fd_sink__set_property (GObject * object, guint prop_id,
   fdsink = GST_FD_SINK (object);
 
   switch (prop_id) {
-    case ARG_FD:
-      fdsink->fd = g_value_get_int (value);
+    case ARG_FD:{
+      int fd;
+
+      fd = g_value_get_int (value);
+      gst_fd_sink_update_fd (fdsink, fd);
       break;
+    }
     default:
       break;
   }
@@ -176,3 +460,56 @@ gst_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
       break;
   }
 }
+
+/*** GSTURIHANDLER INTERFACE *************************************************/
+
+static guint
+gst_fd_sink_uri_get_type (void)
+{
+  return GST_URI_SINK;
+}
+static gchar **
+gst_fd_sink_uri_get_protocols (void)
+{
+  static gchar *protocols[] = { "fd", NULL };
+
+  return protocols;
+}
+static const gchar *
+gst_fd_sink_uri_get_uri (GstURIHandler * handler)
+{
+  GstFdSink *sink = GST_FD_SINK (handler);
+
+  return sink->uri;
+}
+
+static gboolean
+gst_fd_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri)
+{
+  gchar *protocol;
+  GstFdSink *sink = GST_FD_SINK (handler);
+  gint fd;
+
+  protocol = gst_uri_get_protocol (uri);
+  if (strcmp (protocol, "fd") != 0) {
+    g_free (protocol);
+    return FALSE;
+  }
+  g_free (protocol);
+
+  if (sscanf (uri, "fd://%d", &fd) != 1)
+    return FALSE;
+
+  return gst_fd_sink_update_fd (sink, fd);
+}
+
+static void
+gst_fd_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
+{
+  GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
+
+  iface->get_type = gst_fd_sink_uri_get_type;
+  iface->get_protocols = gst_fd_sink_uri_get_protocols;
+  iface->get_uri = gst_fd_sink_uri_get_uri;
+  iface->set_uri = gst_fd_sink_uri_set_uri;
+}
index 787abb3..5deba3c 100644 (file)
@@ -25,6 +25,7 @@
 #define __GST_FD_SINK_H__
 
 #include <gst/gst.h>
+#include <gst/base/gstbasesink.h>
 
 G_BEGIN_DECLS
 
@@ -44,15 +45,18 @@ typedef struct _GstFdSink GstFdSink;
 typedef struct _GstFdSinkClass GstFdSinkClass;
 
 struct _GstFdSink {
-  GstElement element;
+  GstBaseSink parent;
 
-  GstPad *sinkpad;
+  gchar *uri;
+
+  gint control_sock[2];
 
   int fd;
+  guint64 bytes_written;
 };
 
 struct _GstFdSinkClass {
-  GstElementClass parent_class;
+  GstBaseSinkClass parent_class;
 };
 
 GType gst_fd_sink_get_type(void);