Rework GstSegment handling
[platform/upstream/gstreamer.git] / plugins / elements / gstfdsrc.c
index 5e2c0f5..8496b10 100644 (file)
  */
 /**
  * SECTION:element-fdsrc
- * @short_description: read from a unix file descriptor
  * @see_also: #GstFdSink
  *
- * <refsect2>
- * <para>
  * Read data from a unix file descriptor.
- * </para>
- * <title>Examples</title>
- * <para>
- * Here is a simple pipeline to read from the standard input and dump the data
- * with a fakesink.
- * <programlisting>
- * gst-launch -v fdsrc ! fakesink dump=1
- * </programlisting>
+ * 
  * To generate data, enter some data on the console folowed by enter.
  * The above mentioned pipeline should dump data packets to the console.
- * </para>
- * <para>
- * If the <link linkend="GstFdSrc--timeout">timeout property</link> is set to a
- * value bigger than 0, fdsrc will generate an element message named
+ * 
+ * If the #GstFdSrc:timeout property is set to a value bigger than 0, fdsrc will
+ * generate an element message named
  * <classname>&quot;GstFdSrcTimeout&quot;</classname>
  * if no data was recieved in the given timeout.
  * The message's structure contains one field:
  *   </para>
  * </listitem>
  * </itemizedlist>
- * </para>
- * <para>
- * Last reviewed on 2008-06-20 (0.10.21)
- * </para>
+ * 
+ * <refsect2>
+ * <title>Example launch line</title>
+ * |[
+ * echo "Hello GStreamer" | gst-launch -v fdsrc ! fakesink dump=true
+ * ]| A simple pipeline to read from the standard input and dump the data
+ * with a fakesink as hex ascii block.
  * </refsect2>
+ * 
+ * Last reviewed on 2008-06-20 (0.10.21)
  */
 
 #ifdef HAVE_CONFIG_H
 #include "gst/gst_private.h"
 
 #include <sys/types.h>
+
+#ifdef G_OS_WIN32
+#include <io.h>                 /* lseek, open, close, read */
+#undef lseek
+#define lseek _lseeki64
+#undef off_t
+#define off_t guint64
+#endif
+
 #include <sys/stat.h>
+#ifdef HAVE_SYS_SOCKET_H
 #include <sys/socket.h>
+#endif
 #include <fcntl.h>
 #include <stdio.h>
 #ifdef HAVE_UNISTD_H
 #include <unistd.h>
 #endif
 #ifdef _MSC_VER
-#include <io.h>
+#undef stat
+#define stat _stat
+#define fstat _fstat
+#define S_ISREG(m)     (((m)&S_IFREG)==S_IFREG)
 #endif
 #include <stdlib.h>
 #include <errno.h>
@@ -105,23 +113,11 @@ enum
 
 static void gst_fd_src_uri_handler_init (gpointer g_iface, gpointer iface_data);
 
-static void
-_do_init (GType fd_src_type)
-{
-  static const GInterfaceInfo urihandler_info = {
-    gst_fd_src_uri_handler_init,
-    NULL,
-    NULL
-  };
-
-  g_type_add_interface_static (fd_src_type, GST_TYPE_URI_HANDLER,
-      &urihandler_info);
-
+#define _do_init \
+  G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_fd_src_uri_handler_init); \
   GST_DEBUG_CATEGORY_INIT (gst_fd_src_debug, "fdsrc", 0, "fdsrc element");
-}
-
-GST_BOILERPLATE_FULL (GstFdSrc, gst_fd_src, GstPushSrc, GST_TYPE_PUSH_SRC,
-    _do_init);
+#define gst_fd_src_parent_class parent_class
+G_DEFINE_TYPE_WITH_CODE (GstFdSrc, gst_fd_src, GST_TYPE_PUSH_SRC, _do_init);
 
 static void gst_fd_src_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
@@ -136,28 +132,16 @@ static gboolean gst_fd_src_unlock_stop (GstBaseSrc * bsrc);
 static gboolean gst_fd_src_is_seekable (GstBaseSrc * bsrc);
 static gboolean gst_fd_src_get_size (GstBaseSrc * src, guint64 * size);
 static gboolean gst_fd_src_do_seek (GstBaseSrc * src, GstSegment * segment);
+static gboolean gst_fd_src_query (GstBaseSrc * src, GstQuery ** query);
 
 static GstFlowReturn gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf);
 
 static void
-gst_fd_src_base_init (gpointer g_class)
-{
-  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
-
-  gst_element_class_set_details_simple (gstelement_class,
-      "Filedescriptor Source",
-      "Source/File",
-      "Read from a file descriptor", "Erik Walthinsen <omega@cse.ogi.edu>");
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&srctemplate));
-}
-
-static void
 gst_fd_src_class_init (GstFdSrcClass * klass)
 {
   GObjectClass *gobject_class;
-  GstBaseSrcClass *gstbasesrc_class;
   GstElementClass *gstelement_class;
+  GstBaseSrcClass *gstbasesrc_class;
   GstPushSrcClass *gstpush_src_class;
 
   gobject_class = G_OBJECT_CLASS (klass);
@@ -165,8 +149,6 @@ gst_fd_src_class_init (GstFdSrcClass * klass)
   gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
   gstpush_src_class = GST_PUSH_SRC_CLASS (klass);
 
-  parent_class = g_type_class_peek_parent (klass);
-
   gobject_class->set_property = gst_fd_src_set_property;
   gobject_class->get_property = gst_fd_src_get_property;
   gobject_class->dispose = gst_fd_src_dispose;
@@ -174,12 +156,26 @@ gst_fd_src_class_init (GstFdSrcClass * klass)
   g_object_class_install_property (gobject_class, PROP_FD,
       g_param_spec_int ("fd", "fd", "An open file descriptor to read from",
           0, G_MAXINT, DEFAULT_FD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  /**
+   * GstFdSrc:timeout
+   *
+   * Post a message after timeout microseconds
+   *
+   * Since: 0.10.21
+   */
   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
       g_param_spec_uint64 ("timeout", "Timeout",
           "Post a message after timeout microseconds (0 = disabled)", 0,
           G_MAXUINT64, DEFAULT_TIMEOUT,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  gst_element_class_set_details_simple (gstelement_class,
+      "Filedescriptor Source",
+      "Source/File",
+      "Read from a file descriptor", "Erik Walthinsen <omega@cse.ogi.edu>");
+  gst_element_class_add_pad_template (gstelement_class,
+      gst_static_pad_template_get (&srctemplate));
+
   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_fd_src_start);
   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_fd_src_stop);
   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_fd_src_unlock);
@@ -187,16 +183,18 @@ gst_fd_src_class_init (GstFdSrcClass * klass)
   gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_fd_src_is_seekable);
   gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR (gst_fd_src_get_size);
   gstbasesrc_class->do_seek = GST_DEBUG_FUNCPTR (gst_fd_src_do_seek);
+  gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_fd_src_query);
 
   gstpush_src_class->create = GST_DEBUG_FUNCPTR (gst_fd_src_create);
 }
 
 static void
-gst_fd_src_init (GstFdSrc * fdsrc, GstFdSrcClass * klass)
+gst_fd_src_init (GstFdSrc * fdsrc)
 {
-  fdsrc->new_fd = 0;
+  fdsrc->new_fd = DEFAULT_FD;
   fdsrc->seekable_fd = FALSE;
-  fdsrc->fd = DEFAULT_FD;
+  fdsrc->fd = -1;
+  fdsrc->size = -1;
   fdsrc->timeout = DEFAULT_TIMEOUT;
   fdsrc->uri = g_strdup_printf ("fd://0");
   fdsrc->curoffset = 0;
@@ -214,10 +212,13 @@ gst_fd_src_dispose (GObject * obj)
 }
 
 static void
-gst_fd_src_update_fd (GstFdSrc * src)
+gst_fd_src_update_fd (GstFdSrc * src, guint64 size)
 {
   struct stat stat_results;
 
+  GST_DEBUG_OBJECT (src, "fdset %p, old_fd %d, new_fd %d", src->fdset, src->fd,
+      src->new_fd);
+
   /* we need to always update the fdset since it may not have existed when
    * gst_fd_src_update_fd () was called earlier */
   if (src->fdset != NULL) {
@@ -225,6 +226,7 @@ gst_fd_src_update_fd (GstFdSrc * src)
 
     if (src->fd >= 0) {
       fd.fd = src->fd;
+      /* this will log a harmless warning, if it was never added */
       gst_poll_remove_fd (src->fdset, &fd);
     }
 
@@ -233,11 +235,15 @@ gst_fd_src_update_fd (GstFdSrc * src)
     gst_poll_fd_ctl_read (src->fdset, &fd, TRUE);
   }
 
+
   if (src->fd != src->new_fd) {
     GST_INFO_OBJECT (src, "Updating to fd %d", src->new_fd);
 
     src->fd = src->new_fd;
 
+    GST_INFO_OBJECT (src, "Setting size to fd %" G_GUINT64_FORMAT, size);
+    src->size = size;
+
     g_free (src->uri);
     src->uri = g_strdup_printf ("fd://%d", src->fd);
 
@@ -273,7 +279,7 @@ gst_fd_src_start (GstBaseSrc * bsrc)
   if ((src->fdset = gst_poll_new (TRUE)) == NULL)
     goto socket_pair;
 
-  gst_fd_src_update_fd (src);
+  gst_fd_src_update_fd (src, -1);
 
   return TRUE;
 
@@ -340,7 +346,7 @@ gst_fd_src_set_property (GObject * object, guint prop_id, const GValue * value,
       GST_OBJECT_LOCK (object);
       if (GST_STATE (GST_ELEMENT (src)) <= GST_STATE_READY) {
         GST_DEBUG_OBJECT (src, "state ready or lower, updating to use new fd");
-        gst_fd_src_update_fd (src);
+        gst_fd_src_update_fd (src, -1);
       } else {
         GST_DEBUG_OBJECT (src, "state above ready, not updating to new fd yet");
       }
@@ -384,6 +390,8 @@ gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   gssize readbytes;
   guint blocksize;
   GstClockTime timeout;
+  guint8 *data;
+  gsize maxsize;
 
 #ifndef HAVE_WIN32
   gboolean try_again;
@@ -432,20 +440,25 @@ gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 
   /* create the buffer */
   buf = gst_buffer_new_and_alloc (blocksize);
+  if (G_UNLIKELY (buf == NULL))
+    goto alloc_failed;
+
+  data = gst_buffer_map (buf, NULL, &maxsize, GST_MAP_WRITE);
 
   do {
-    readbytes = read (src->fd, GST_BUFFER_DATA (buf), blocksize);
+    readbytes = read (src->fd, data, blocksize);
     GST_LOG_OBJECT (src, "read %" G_GSSIZE_FORMAT, readbytes);
   } while (readbytes == -1 && errno == EINTR);  /* retry if interrupted */
 
   if (readbytes < 0)
     goto read_error;
 
+  gst_buffer_unmap (buf, data, readbytes);
+
   if (readbytes == 0)
     goto eos;
 
   GST_BUFFER_OFFSET (buf) = src->curoffset;
-  GST_BUFFER_SIZE (buf) = readbytes;
   GST_BUFFER_TIMESTAMP (buf) = GST_CLOCK_TIME_NONE;
   src->curoffset += readbytes;
 
@@ -471,6 +484,11 @@ stopped:
     return GST_FLOW_WRONG_STATE;
   }
 #endif
+alloc_failed:
+  {
+    GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", blocksize);
+    return GST_FLOW_ERROR;
+  }
 eos:
   {
     GST_DEBUG_OBJECT (psrc, "Read 0 bytes. EOS.");
@@ -482,12 +500,35 @@ read_error:
     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
         ("read on file descriptor: %s.", g_strerror (errno)));
     GST_DEBUG_OBJECT (psrc, "Error reading from fd");
+    gst_buffer_unmap (buf, data, 0);
     gst_buffer_unref (buf);
     return GST_FLOW_ERROR;
   }
 }
 
 static gboolean
+gst_fd_src_query (GstBaseSrc * basesrc, GstQuery ** query)
+{
+  gboolean ret = FALSE;
+  GstFdSrc *src = GST_FD_SRC (basesrc);
+
+  switch (GST_QUERY_TYPE (*query)) {
+    case GST_QUERY_URI:
+      gst_query_set_uri (*query, src->uri);
+      ret = TRUE;
+      break;
+    default:
+      ret = FALSE;
+      break;
+  }
+
+  if (!ret)
+    ret = GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
+
+  return ret;
+}
+
+static gboolean
 gst_fd_src_is_seekable (GstBaseSrc * bsrc)
 {
   GstFdSrc *src = GST_FD_SRC (bsrc);
@@ -501,6 +542,11 @@ gst_fd_src_get_size (GstBaseSrc * bsrc, guint64 * size)
   GstFdSrc *src = GST_FD_SRC (bsrc);
   struct stat stat_results;
 
+  if (src->size != -1) {
+    *size = src->size;
+    return TRUE;
+  }
+
   if (!src->seekable_fd) {
     /* If it isn't seekable, we won't know the length (but fstat will still
      * succeed, and wrongly say our length is zero. */
@@ -538,7 +584,7 @@ gst_fd_src_do_seek (GstBaseSrc * bsrc, GstSegment * segment)
   if (G_UNLIKELY (res < 0 || res != offset))
     goto seek_failed;
 
-  segment->last_stop = segment->start;
+  segment->position = segment->start;
   segment->time = segment->start;
 
   return TRUE;
@@ -555,13 +601,15 @@ gst_fd_src_uri_get_type (void)
 {
   return GST_URI_SRC;
 }
+
 static gchar **
 gst_fd_src_uri_get_protocols (void)
 {
-  static gchar *protocols[] = { "fd", NULL };
+  static gchar *protocols[] = { (char *) "fd", NULL };
 
   return protocols;
 }
+
 static const gchar *
 gst_fd_src_uri_get_uri (GstURIHandler * handler)
 {
@@ -573,9 +621,12 @@ gst_fd_src_uri_get_uri (GstURIHandler * handler)
 static gboolean
 gst_fd_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
 {
-  gchar *protocol;
+  gchar *protocol, *q;
   GstFdSrc *src = GST_FD_SRC (handler);
   gint fd;
+  guint64 size = -1;
+
+  GST_INFO_OBJECT (src, "checking uri %s", uri);
 
   protocol = gst_uri_get_protocol (uri);
   if (strcmp (protocol, "fd") != 0) {
@@ -587,11 +638,26 @@ gst_fd_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
   if (sscanf (uri, "fd://%d", &fd) != 1 || fd < 0)
     return FALSE;
 
+  if ((q = g_strstr_len (uri, -1, "?"))) {
+    gchar *sp;
+
+    GST_INFO_OBJECT (src, "found ?");
+
+    if ((sp = g_strstr_len (q, -1, "size="))) {
+      if (sscanf (sp, "size=%" G_GUINT64_FORMAT, &size) != 1) {
+        GST_INFO_OBJECT (src, "parsing size failed");
+        size = -1;
+      } else {
+        GST_INFO_OBJECT (src, "found size %" G_GUINT64_FORMAT, size);
+      }
+    }
+  }
+
   src->new_fd = fd;
 
   GST_OBJECT_LOCK (src);
   if (GST_STATE (GST_ELEMENT (src)) <= GST_STATE_READY) {
-    gst_fd_src_update_fd (src);
+    gst_fd_src_update_fd (src, size);
   }
   GST_OBJECT_UNLOCK (src);