Rework GstSegment handling
[platform/upstream/gstreamer.git] / plugins / elements / gstfdsrc.c
index a415414..8496b10 100644 (file)
  */
 /**
  * SECTION:element-fdsrc
- * @short_description: read from a unix file descriptor
  * @see_also: #GstFdSink
  *
  * Read data from a unix file descriptor.
+ * 
+ * To generate data, enter some data on the console folowed by enter.
+ * The above mentioned pipeline should dump data packets to the console.
+ * 
+ * If the #GstFdSrc:timeout property is set to a value bigger than 0, fdsrc will
+ * generate an element message named
+ * <classname>&quot;GstFdSrcTimeout&quot;</classname>
+ * if no data was recieved in the given timeout.
+ * The message's structure contains one field:
+ * <itemizedlist>
+ * <listitem>
+ *   <para>
+ *   #guint64
+ *   <classname>&quot;timeout&quot;</classname>: the timeout in microseconds that
+ *   expired when waiting for data.
+ *   </para>
+ * </listitem>
+ * </itemizedlist>
+ * 
+ * <refsect2>
+ * <title>Example launch line</title>
+ * |[
+ * echo "Hello GStreamer" | gst-launch -v fdsrc ! fakesink dump=true
+ * ]| A simple pipeline to read from the standard input and dump the data
+ * with a fakesink as hex ascii block.
+ * </refsect2>
+ * 
+ * Last reviewed on 2008-06-20 (0.10.21)
  */
 
-
 #ifdef HAVE_CONFIG_H
 #  include "config.h"
 #endif
 #include "gst/gst_private.h"
 
 #include <sys/types.h>
+
+#ifdef G_OS_WIN32
+#include <io.h>                 /* lseek, open, close, read */
+#undef lseek
+#define lseek _lseeki64
+#undef off_t
+#define off_t guint64
+#endif
+
 #include <sys/stat.h>
+#ifdef HAVE_SYS_SOCKET_H
 #include <sys/socket.h>
+#endif
 #include <fcntl.h>
 #include <stdio.h>
 #ifdef HAVE_UNISTD_H
 #include <unistd.h>
 #endif
 #ifdef _MSC_VER
-#include <io.h>
+#undef stat
+#define stat _stat
+#define fstat _fstat
+#define S_ISREG(m)     (((m)&S_IFREG)==S_IFREG)
 #endif
 #include <stdlib.h>
 #include <errno.h>
 
 #include "gstfdsrc.h"
 
-#define DEFAULT_BLOCKSIZE       4096
-
 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
     GST_PAD_SRC,
     GST_PAD_ALWAYS,
@@ -60,31 +98,26 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
 GST_DEBUG_CATEGORY_STATIC (gst_fd_src_debug);
 #define GST_CAT_DEFAULT gst_fd_src_debug
 
+#define DEFAULT_FD              0
+#define DEFAULT_TIMEOUT         0
+
 enum
 {
   PROP_0,
+
   PROP_FD,
+  PROP_TIMEOUT,
+
+  PROP_LAST
 };
 
 static void gst_fd_src_uri_handler_init (gpointer g_iface, gpointer iface_data);
 
-static void
-_do_init (GType fd_src_type)
-{
-  static const GInterfaceInfo urihandler_info = {
-    gst_fd_src_uri_handler_init,
-    NULL,
-    NULL
-  };
-
-  g_type_add_interface_static (fd_src_type, GST_TYPE_URI_HANDLER,
-      &urihandler_info);
-
+#define _do_init \
+  G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_fd_src_uri_handler_init); \
   GST_DEBUG_CATEGORY_INIT (gst_fd_src_debug, "fdsrc", 0, "fdsrc element");
-}
-
-GST_BOILERPLATE_FULL (GstFdSrc, gst_fd_src, GstPushSrc, GST_TYPE_PUSH_SRC,
-    _do_init);
+#define gst_fd_src_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstFdSrc, gst_fd_src, GST_TYPE_PUSH_SRC, _do_init);
 
 static void gst_fd_src_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
@@ -99,28 +132,16 @@ static gboolean gst_fd_src_unlock_stop (GstBaseSrc * bsrc);
 static gboolean gst_fd_src_is_seekable (GstBaseSrc * bsrc);
 static gboolean gst_fd_src_get_size (GstBaseSrc * src, guint64 * size);
 static gboolean gst_fd_src_do_seek (GstBaseSrc * src, GstSegment * segment);
+static gboolean gst_fd_src_query (GstBaseSrc * src, GstQuery ** query);
 
 static GstFlowReturn gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf);
 
 static void
-gst_fd_src_base_init (gpointer g_class)
-{
-  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
-
-  gst_element_class_set_details_simple (gstelement_class,
-      "Filedescriptor Source",
-      "Source/File",
-      "Read from a file descriptor", "Erik Walthinsen <omega@cse.ogi.edu>");
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&srctemplate));
-}
-
-static void
 gst_fd_src_class_init (GstFdSrcClass * klass)
 {
   GObjectClass *gobject_class;
-  GstBaseSrcClass *gstbasesrc_class;
   GstElementClass *gstelement_class;
+  GstBaseSrcClass *gstbasesrc_class;
   GstPushSrcClass *gstpush_src_class;
 
   gobject_class = G_OBJECT_CLASS (klass);
@@ -128,15 +149,32 @@ gst_fd_src_class_init (GstFdSrcClass * klass)
   gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
   gstpush_src_class = GST_PUSH_SRC_CLASS (klass);
 
-  parent_class = g_type_class_peek_parent (klass);
-
   gobject_class->set_property = gst_fd_src_set_property;
   gobject_class->get_property = gst_fd_src_get_property;
   gobject_class->dispose = gst_fd_src_dispose;
 
   g_object_class_install_property (gobject_class, PROP_FD,
       g_param_spec_int ("fd", "fd", "An open file descriptor to read from",
-          0, G_MAXINT, 0, G_PARAM_READWRITE));
+          0, G_MAXINT, DEFAULT_FD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  /**
+   * GstFdSrc:timeout
+   *
+   * Post a message after timeout microseconds
+   *
+   * Since: 0.10.21
+   */
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
+      g_param_spec_uint64 ("timeout", "Timeout",
+          "Post a message after timeout microseconds (0 = disabled)", 0,
+          G_MAXUINT64, DEFAULT_TIMEOUT,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  gst_element_class_set_details_simple (gstelement_class,
+      "Filedescriptor Source",
+      "Source/File",
+      "Read from a file descriptor", "Erik Walthinsen <omega@cse.ogi.edu>");
+  gst_element_class_add_pad_template (gstelement_class,
+      gst_static_pad_template_get (&srctemplate));
 
   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_fd_src_start);
   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_fd_src_stop);
@@ -145,16 +183,19 @@ gst_fd_src_class_init (GstFdSrcClass * klass)
   gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_fd_src_is_seekable);
   gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR (gst_fd_src_get_size);
   gstbasesrc_class->do_seek = GST_DEBUG_FUNCPTR (gst_fd_src_do_seek);
+  gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_fd_src_query);
 
   gstpush_src_class->create = GST_DEBUG_FUNCPTR (gst_fd_src_create);
 }
 
 static void
-gst_fd_src_init (GstFdSrc * fdsrc, GstFdSrcClass * klass)
+gst_fd_src_init (GstFdSrc * fdsrc)
 {
-  fdsrc->fd = -1;
-  fdsrc->new_fd = 0;
+  fdsrc->new_fd = DEFAULT_FD;
   fdsrc->seekable_fd = FALSE;
+  fdsrc->fd = -1;
+  fdsrc->size = -1;
+  fdsrc->timeout = DEFAULT_TIMEOUT;
   fdsrc->uri = g_strdup_printf ("fd://0");
   fdsrc->curoffset = 0;
 }
@@ -171,33 +212,38 @@ gst_fd_src_dispose (GObject * obj)
 }
 
 static void
-gst_fd_src_update_fd (GstFdSrc * src)
+gst_fd_src_update_fd (GstFdSrc * src, guint64 size)
 {
   struct stat stat_results;
 
+  GST_DEBUG_OBJECT (src, "fdset %p, old_fd %d, new_fd %d", src->fdset, src->fd,
+      src->new_fd);
+
   /* we need to always update the fdset since it may not have existed when
-   * gst_fd_src_update_fd() was called earlier */
+   * gst_fd_src_update_fd () was called earlier */
   if (src->fdset != NULL) {
-    GstPollFD fd = { 0, };
+    GstPollFD fd = GST_POLL_FD_INIT;
 
     if (src->fd >= 0) {
       fd.fd = src->fd;
+      /* this will log a harmless warning, if it was never added */
       gst_poll_remove_fd (src->fdset, &fd);
     }
 
-    /* Reset the GstPollFD */
-    memset (&fd, 0, sizeof (GstPollFD));
-
     fd.fd = src->new_fd;
     gst_poll_add_fd (src->fdset, &fd);
     gst_poll_fd_ctl_read (src->fdset, &fd, TRUE);
   }
 
+
   if (src->fd != src->new_fd) {
     GST_INFO_OBJECT (src, "Updating to fd %d", src->new_fd);
 
     src->fd = src->new_fd;
 
+    GST_INFO_OBJECT (src, "Setting size to fd %" G_GUINT64_FORMAT, size);
+    src->size = size;
+
     g_free (src->uri);
     src->uri = g_strdup_printf ("fd://%d", src->fd);
 
@@ -230,10 +276,10 @@ gst_fd_src_start (GstBaseSrc * bsrc)
 
   src->curoffset = 0;
 
-  if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
+  if ((src->fdset = gst_poll_new (TRUE)) == NULL)
     goto socket_pair;
 
-  gst_fd_src_update_fd (src);
+  gst_fd_src_update_fd (src, -1);
 
   return TRUE;
 
@@ -300,12 +346,17 @@ gst_fd_src_set_property (GObject * object, guint prop_id, const GValue * value,
       GST_OBJECT_LOCK (object);
       if (GST_STATE (GST_ELEMENT (src)) <= GST_STATE_READY) {
         GST_DEBUG_OBJECT (src, "state ready or lower, updating to use new fd");
-        gst_fd_src_update_fd (src);
+        gst_fd_src_update_fd (src, -1);
       } else {
         GST_DEBUG_OBJECT (src, "state above ready, not updating to new fd yet");
       }
       GST_OBJECT_UNLOCK (object);
       break;
+    case PROP_TIMEOUT:
+      src->timeout = g_value_get_uint64 (value);
+      GST_DEBUG_OBJECT (src, "poll timeout set to %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (src->timeout));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -322,6 +373,9 @@ gst_fd_src_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_FD:
       g_value_set_int (value, src->fd);
       break;
+    case PROP_TIMEOUT:
+      g_value_set_uint64 (value, src->timeout);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -335,44 +389,76 @@ gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   GstBuffer *buf;
   gssize readbytes;
   guint blocksize;
+  GstClockTime timeout;
+  guint8 *data;
+  gsize maxsize;
 
 #ifndef HAVE_WIN32
+  gboolean try_again;
   gint retval;
 #endif
 
   src = GST_FD_SRC (psrc);
 
+  if (src->timeout > 0) {
+    timeout = src->timeout * GST_USECOND;
+  } else {
+    timeout = GST_CLOCK_TIME_NONE;
+  }
+
 #ifndef HAVE_WIN32
   do {
-    retval = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE);
-  } while (retval == -1 && errno == EINTR);
-
-  if (retval == -1) {
-    if (errno == EBUSY)
-      goto stopped;
-    else
-      goto select_error;
-  }
+    try_again = FALSE;
+
+    GST_LOG_OBJECT (src, "doing poll, timeout %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (src->timeout));
+
+    retval = gst_poll_wait (src->fdset, timeout);
+    GST_LOG_OBJECT (src, "poll returned %d", retval);
+
+    if (G_UNLIKELY (retval == -1)) {
+      if (errno == EINTR || errno == EAGAIN) {
+        /* retry if interrupted */
+        try_again = TRUE;
+      } else if (errno == EBUSY) {
+        goto stopped;
+      } else {
+        goto poll_error;
+      }
+    } else if (G_UNLIKELY (retval == 0)) {
+      try_again = TRUE;
+      /* timeout, post element message */
+      gst_element_post_message (GST_ELEMENT_CAST (src),
+          gst_message_new_element (GST_OBJECT_CAST (src),
+              gst_structure_new ("GstFdSrcTimeout",
+                  "timeout", G_TYPE_UINT64, src->timeout, NULL)));
+    }
+  } while (G_UNLIKELY (try_again));     /* retry if interrupted or timeout */
 #endif
 
   blocksize = GST_BASE_SRC (src)->blocksize;
 
   /* create the buffer */
   buf = gst_buffer_new_and_alloc (blocksize);
+  if (G_UNLIKELY (buf == NULL))
+    goto alloc_failed;
+
+  data = gst_buffer_map (buf, NULL, &maxsize, GST_MAP_WRITE);
 
   do {
-    readbytes = read (src->fd, GST_BUFFER_DATA (buf), blocksize);
+    readbytes = read (src->fd, data, blocksize);
     GST_LOG_OBJECT (src, "read %" G_GSSIZE_FORMAT, readbytes);
   } while (readbytes == -1 && errno == EINTR);  /* retry if interrupted */
 
   if (readbytes < 0)
     goto read_error;
 
+  gst_buffer_unmap (buf, data, readbytes);
+
   if (readbytes == 0)
     goto eos;
 
   GST_BUFFER_OFFSET (buf) = src->curoffset;
-  GST_BUFFER_SIZE (buf) = readbytes;
   GST_BUFFER_TIMESTAMP (buf) = GST_CLOCK_TIME_NONE;
   src->curoffset += readbytes;
 
@@ -385,19 +471,24 @@ gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 
   /* ERRORS */
 #ifndef HAVE_WIN32
-select_error:
+poll_error:
   {
     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-        ("select on file descriptor: %s.", g_strerror (errno)));
-    GST_DEBUG_OBJECT (psrc, "Error during select");
+        ("poll on file descriptor: %s.", g_strerror (errno)));
+    GST_DEBUG_OBJECT (psrc, "Error during poll");
     return GST_FLOW_ERROR;
   }
 stopped:
   {
-    GST_DEBUG_OBJECT (psrc, "Select stopped");
+    GST_DEBUG_OBJECT (psrc, "Poll stopped");
     return GST_FLOW_WRONG_STATE;
   }
 #endif
+alloc_failed:
+  {
+    GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", blocksize);
+    return GST_FLOW_ERROR;
+  }
 eos:
   {
     GST_DEBUG_OBJECT (psrc, "Read 0 bytes. EOS.");
@@ -409,12 +500,35 @@ read_error:
     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
         ("read on file descriptor: %s.", g_strerror (errno)));
     GST_DEBUG_OBJECT (psrc, "Error reading from fd");
+    gst_buffer_unmap (buf, data, 0);
     gst_buffer_unref (buf);
     return GST_FLOW_ERROR;
   }
 }
 
-gboolean
+static gboolean
+gst_fd_src_query (GstBaseSrc * basesrc, GstQuery ** query)
+{
+  gboolean ret = FALSE;
+  GstFdSrc *src = GST_FD_SRC (basesrc);
+
+  switch (GST_QUERY_TYPE (*query)) {
+    case GST_QUERY_URI:
+      gst_query_set_uri (*query, src->uri);
+      ret = TRUE;
+      break;
+    default:
+      ret = FALSE;
+      break;
+  }
+
+  if (!ret)
+    ret = GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
+
+  return ret;
+}
+
+static gboolean
 gst_fd_src_is_seekable (GstBaseSrc * bsrc)
 {
   GstFdSrc *src = GST_FD_SRC (bsrc);
@@ -422,12 +536,17 @@ gst_fd_src_is_seekable (GstBaseSrc * bsrc)
   return src->seekable_fd;
 }
 
-gboolean
+static gboolean
 gst_fd_src_get_size (GstBaseSrc * bsrc, guint64 * size)
 {
   GstFdSrc *src = GST_FD_SRC (bsrc);
   struct stat stat_results;
 
+  if (src->size != -1) {
+    *size = src->size;
+    return TRUE;
+  }
+
   if (!src->seekable_fd) {
     /* If it isn't seekable, we won't know the length (but fstat will still
      * succeed, and wrongly say our length is zero. */
@@ -448,7 +567,7 @@ could_not_stat:
   }
 }
 
-gboolean
+static gboolean
 gst_fd_src_do_seek (GstBaseSrc * bsrc, GstSegment * segment)
 {
   gint res;
@@ -465,7 +584,7 @@ gst_fd_src_do_seek (GstBaseSrc * bsrc, GstSegment * segment)
   if (G_UNLIKELY (res < 0 || res != offset))
     goto seek_failed;
 
-  segment->last_stop = segment->start;
+  segment->position = segment->start;
   segment->time = segment->start;
 
   return TRUE;
@@ -482,13 +601,15 @@ gst_fd_src_uri_get_type (void)
 {
   return GST_URI_SRC;
 }
+
 static gchar **
 gst_fd_src_uri_get_protocols (void)
 {
-  static gchar *protocols[] = { "fd", NULL };
+  static gchar *protocols[] = { (char *) "fd", NULL };
 
   return protocols;
 }
+
 static const gchar *
 gst_fd_src_uri_get_uri (GstURIHandler * handler)
 {
@@ -500,9 +621,12 @@ gst_fd_src_uri_get_uri (GstURIHandler * handler)
 static gboolean
 gst_fd_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
 {
-  gchar *protocol;
+  gchar *protocol, *q;
   GstFdSrc *src = GST_FD_SRC (handler);
   gint fd;
+  guint64 size = -1;
+
+  GST_INFO_OBJECT (src, "checking uri %s", uri);
 
   protocol = gst_uri_get_protocol (uri);
   if (strcmp (protocol, "fd") != 0) {
@@ -514,11 +638,26 @@ gst_fd_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
   if (sscanf (uri, "fd://%d", &fd) != 1 || fd < 0)
     return FALSE;
 
+  if ((q = g_strstr_len (uri, -1, "?"))) {
+    gchar *sp;
+
+    GST_INFO_OBJECT (src, "found ?");
+
+    if ((sp = g_strstr_len (q, -1, "size="))) {
+      if (sscanf (sp, "size=%" G_GUINT64_FORMAT, &size) != 1) {
+        GST_INFO_OBJECT (src, "parsing size failed");
+        size = -1;
+      } else {
+        GST_INFO_OBJECT (src, "found size %" G_GUINT64_FORMAT, size);
+      }
+    }
+  }
+
   src->new_fd = fd;
 
   GST_OBJECT_LOCK (src);
   if (GST_STATE (GST_ELEMENT (src)) <= GST_STATE_READY) {
-    gst_fd_src_update_fd (src);
+    gst_fd_src_update_fd (src, size);
   }
   GST_OBJECT_UNLOCK (src);