plugins/elements/: Port to GstPoll. See #505417.
authorPeter Kjellerstedt <pkj@axis.com>
Thu, 28 Feb 2008 10:18:02 +0000 (10:18 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Thu, 28 Feb 2008 10:18:02 +0000 (10:18 +0000)
Original commit message from CVS:
Patch by: Peter Kjellerstedt <pkj at axis dot com>
* plugins/elements/gstfdsink.c: (gst_fd_sink_render),
(gst_fd_sink_start), (gst_fd_sink_stop), (gst_fd_sink_unlock),
(gst_fd_sink_unlock_stop), (gst_fd_sink_update_fd):
* plugins/elements/gstfdsink.h:
* plugins/elements/gstfdsrc.c: (gst_fd_src_update_fd),
(gst_fd_src_start), (gst_fd_src_stop), (gst_fd_src_unlock),
(gst_fd_src_unlock_stop), (gst_fd_src_create),
(gst_fd_src_uri_set_uri):
* plugins/elements/gstfdsrc.h:
Port to GstPoll. See #505417.

ChangeLog
plugins/elements/gstfdsink.c
plugins/elements/gstfdsink.h
plugins/elements/gstfdsrc.c
plugins/elements/gstfdsrc.h

index f63c81c..9699287 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,18 @@
+2008-02-28  Wim Taymans  <wim.taymans@collabora.co.uk>
+
+       Patch by: Peter Kjellerstedt <pkj at axis dot com>
+
+       * plugins/elements/gstfdsink.c: (gst_fd_sink_render),
+       (gst_fd_sink_start), (gst_fd_sink_stop), (gst_fd_sink_unlock),
+       (gst_fd_sink_unlock_stop), (gst_fd_sink_update_fd):
+       * plugins/elements/gstfdsink.h:
+       * plugins/elements/gstfdsrc.c: (gst_fd_src_update_fd),
+       (gst_fd_src_start), (gst_fd_src_stop), (gst_fd_src_unlock),
+       (gst_fd_src_unlock_stop), (gst_fd_src_create),
+       (gst_fd_src_uri_set_uri):
+       * plugins/elements/gstfdsrc.h:
+       Port to GstPoll. See #505417.
+
 2008-02-27  Jan Schmidt  <jan.schmidt@sun.com>
 
        * win32/common/libgstreamer.def:
index 6287501..a6970b1 100644 (file)
 
 #include "gstfdsink.h"
 
-/* We add a control socket as in fdsrc to make it shutdown quickly when it's blocking on the fd.
- * Select is used to determine when the fd is ready for use. When the element state is changed,
- * it happens from another thread while fdsink is select'ing on the fd. The state-change thread 
- * sends a control message, so fdsink wakes up and changes state immediately otherwise
- * it would stay blocked until it receives some data. */
-
-/* the select call is also performed on the control sockets, that way
- * we can send special commands to unblock the select call */
-#define CONTROL_STOP            'S'     /* stop the select call */
-#define CONTROL_SOCKETS(sink)   sink->control_sock
-#define WRITE_SOCKET(sink)      sink->control_sock[1]
-#define READ_SOCKET(sink)       sink->control_sock[0]
-
-#define SEND_COMMAND(sink, command)          \
-G_STMT_START {                              \
-  unsigned char c; c = command;             \
-  write (WRITE_SOCKET(sink), &c, 1);         \
-} G_STMT_END
-
-#define READ_COMMAND(sink, command, res)        \
-G_STMT_START {                                 \
-  res = read(READ_SOCKET(sink), &command, 1);   \
-} G_STMT_END
-
 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_SINK,
     GST_PAD_ALWAYS,
@@ -236,15 +212,13 @@ static GstFlowReturn
 gst_fd_sink_render (GstBaseSink * sink, GstBuffer * buffer)
 {
   GstFdSink *fdsink;
+  guint8 *data;
+  guint size;
+  gint written;
 
 #ifndef HAVE_WIN32
-  fd_set readfds;
-  fd_set writefds;
   gint retval;
 #endif
-  guint8 *data;
-  guint size;
-  gint written;
 
   fdsink = GST_FD_SINK (sink);
 
@@ -255,24 +229,18 @@ gst_fd_sink_render (GstBaseSink * sink, GstBuffer * buffer)
 
 again:
 #ifndef HAVE_WIN32
-
-  FD_ZERO (&readfds);
-  FD_SET (READ_SOCKET (fdsink), &readfds);
-
-  FD_ZERO (&writefds);
-  FD_SET (fdsink->fd, &writefds);
-
   do {
     GST_DEBUG_OBJECT (fdsink, "going into select, have %d bytes to write",
         size);
-    retval = select (FD_SETSIZE, &readfds, &writefds, NULL, NULL);
-  } while ((retval == -1 && errno == EINTR));
-
-  if (retval == -1)
-    goto select_error;
-
-  if (FD_ISSET (READ_SOCKET (fdsink), &readfds))
-    goto stopped;
+    retval = gst_poll_wait (fdsink->fdset, GST_CLOCK_TIME_NONE);
+  } while (retval == -1 && errno == EINTR);
+
+  if (retval == -1) {
+    if (errno == EBUSY)
+      goto stopped;
+    else
+      goto select_error;
+  }
 #endif
 
   GST_DEBUG_OBJECT (fdsink, "writing %d bytes to file descriptor %d", size,
@@ -381,20 +349,18 @@ static gboolean
 gst_fd_sink_start (GstBaseSink * basesink)
 {
   GstFdSink *fdsink;
-  gint control_sock[2];
+  GstPollFD fd;
 
   fdsink = GST_FD_SINK (basesink);
   if (!gst_fd_sink_check_fd (fdsink, fdsink->fd))
     return FALSE;
 
-  if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
+  if ((fdsink->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
     goto socket_pair;
 
-  READ_SOCKET (fdsink) = control_sock[0];
-  WRITE_SOCKET (fdsink) = control_sock[1];
-
-  fcntl (READ_SOCKET (fdsink), F_SETFL, O_NONBLOCK);
-  fcntl (WRITE_SOCKET (fdsink), F_SETFL, O_NONBLOCK);
+  fd.fd = fdsink->fd;
+  gst_poll_add_fd (fdsink->fdset, &fd);
+  gst_poll_fd_ctl_write (fdsink->fdset, &fd, TRUE);
 
   return TRUE;
 
@@ -412,8 +378,10 @@ gst_fd_sink_stop (GstBaseSink * basesink)
 {
   GstFdSink *fdsink = GST_FD_SINK (basesink);
 
-  close (READ_SOCKET (fdsink));
-  close (WRITE_SOCKET (fdsink));
+  if (fdsink->fdset) {
+    gst_poll_free (fdsink->fdset);
+    fdsink->fdset = NULL;
+  }
 
   return TRUE;
 }
@@ -423,8 +391,10 @@ gst_fd_sink_unlock (GstBaseSink * basesink)
 {
   GstFdSink *fdsink = GST_FD_SINK (basesink);
 
-  GST_LOG_OBJECT (fdsink, "Sending unlock command to queue");
-  SEND_COMMAND (fdsink, CONTROL_STOP);
+  GST_LOG_OBJECT (fdsink, "Flushing");
+  GST_OBJECT_LOCK (fdsink);
+  gst_poll_set_flushing (fdsink->fdset, TRUE);
+  GST_OBJECT_UNLOCK (fdsink);
 
   return TRUE;
 }
@@ -434,20 +404,10 @@ gst_fd_sink_unlock_stop (GstBaseSink * basesink)
 {
   GstFdSink *fdsink = GST_FD_SINK (basesink);
 
-  /* read all stop commands */
-  GST_LOG_OBJECT (fdsink, "Clearing unlock command queue");
-
-  while (TRUE) {
-    gchar command;
-    int res;
-
-    READ_COMMAND (fdsink, command, res);
-    if (res < 0) {
-      GST_LOG_OBJECT (fdsink, "no more commands");
-      /* no more commands */
-      break;
-    }
-  }
+  GST_LOG_OBJECT (fdsink, "No longer flushing");
+  GST_OBJECT_LOCK (fdsink);
+  gst_poll_set_flushing (fdsink->fdset, FALSE);
+  GST_OBJECT_UNLOCK (fdsink);
 
   return TRUE;
 }
@@ -463,9 +423,20 @@ gst_fd_sink_update_fd (GstFdSink * fdsink, int new_fd)
 
   /* assign the fd */
   GST_OBJECT_LOCK (fdsink);
+  if (fdsink->fdset) {
+    GstPollFD fd = { 0 };
+
+    fd.fd = fdsink->fd;
+    gst_poll_remove_fd (fdsink->fdset, &fd);
+
+    fd.fd = new_fd;
+    gst_poll_add_fd (fdsink->fdset, &fd);
+    gst_poll_fd_ctl_write (fdsink->fdset, &fd, TRUE);
+  }
   fdsink->fd = new_fd;
   g_free (fdsink->uri);
   fdsink->uri = g_strdup_printf ("fd://%d", fdsink->fd);
+
   GST_OBJECT_UNLOCK (fdsink);
 
   return TRUE;
index a2b3cc7..f92360e 100644 (file)
@@ -54,7 +54,7 @@ struct _GstFdSink {
 
   gchar *uri;
 
-  gint control_sock[2];
+  GstPoll *fdset;
 
   int fd;
   guint64 bytes_written;
index d986eb1..97ada88 100644 (file)
 
 #include "gstfdsrc.h"
 
-/* the select call is also performed on the control sockets, that way
- * we can send special commands to unblock the select call */
-#define CONTROL_STOP            'S'     /* stop the select call */
-#define CONTROL_SOCKETS(src)   src->control_sock
-#define WRITE_SOCKET(src)      src->control_sock[1]
-#define READ_SOCKET(src)       src->control_sock[0]
-
-#define SEND_COMMAND(src, command)          \
-G_STMT_START {                              \
-  unsigned char c; c = command;             \
-  write (WRITE_SOCKET(src), &c, 1);         \
-} G_STMT_END
-
-#define READ_COMMAND(src, command, res)        \
-G_STMT_START {                                 \
-  res = read(READ_SOCKET(src), &command, 1);   \
-} G_STMT_END
-
 #define DEFAULT_BLOCKSIZE       4096
 
 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
@@ -193,6 +175,21 @@ gst_fd_src_update_fd (GstFdSrc * src)
 {
   struct stat stat_results;
 
+  /* we need to always update the fdset since it may not have existed when
+   * gst_fd_src_update_fd() was called earlier */
+  if (src->fdset != NULL) {
+    GstPollFD fd;
+
+    if (src->fd >= 0) {
+      fd.fd = src->fd;
+      gst_poll_remove_fd (src->fdset, &fd);
+    }
+
+    fd.fd = src->new_fd;
+    gst_poll_add_fd (src->fdset, &fd);
+    gst_poll_fd_ctl_read (src->fdset, &fd, TRUE);
+  }
+
   if (src->fd != src->new_fd) {
     GST_INFO_OBJECT (src, "Updating to fd %d", src->new_fd);
 
@@ -227,20 +224,13 @@ static gboolean
 gst_fd_src_start (GstBaseSrc * bsrc)
 {
   GstFdSrc *src = GST_FD_SRC (bsrc);
-  gint control_sock[2];
 
   src->curoffset = 0;
 
-  gst_fd_src_update_fd (src);
-
-  if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
+  if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
     goto socket_pair;
 
-  READ_SOCKET (src) = control_sock[0];
-  WRITE_SOCKET (src) = control_sock[1];
-
-  fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
-  fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
+  gst_fd_src_update_fd (src);
 
   return TRUE;
 
@@ -258,8 +248,10 @@ gst_fd_src_stop (GstBaseSrc * bsrc)
 {
   GstFdSrc *src = GST_FD_SRC (bsrc);
 
-  close (READ_SOCKET (src));
-  close (WRITE_SOCKET (src));
+  if (src->fdset) {
+    gst_poll_free (src->fdset);
+    src->fdset = NULL;
+  }
 
   return TRUE;
 }
@@ -269,8 +261,10 @@ gst_fd_src_unlock (GstBaseSrc * bsrc)
 {
   GstFdSrc *src = GST_FD_SRC (bsrc);
 
-  GST_LOG_OBJECT (src, "sending unlock command");
-  SEND_COMMAND (src, CONTROL_STOP);
+  GST_LOG_OBJECT (src, "Flushing");
+  GST_OBJECT_LOCK (src);
+  gst_poll_set_flushing (src->fdset, TRUE);
+  GST_OBJECT_UNLOCK (src);
 
   return TRUE;
 }
@@ -280,22 +274,10 @@ gst_fd_src_unlock_stop (GstBaseSrc * bsrc)
 {
   GstFdSrc *src = GST_FD_SRC (bsrc);
 
-  GST_LOG_OBJECT (src, "clearing unlock command queue");
-
-  /* read all stop commands */
-  while (TRUE) {
-    gchar command;
-    int res;
-
-    GST_LOG_OBJECT (src, "reading command");
-
-    READ_COMMAND (src, command, res);
-    if (res < 0) {
-      GST_LOG_OBJECT (src, "no more commands");
-      /* no more commands */
-      break;
-    }
-  }
+  GST_LOG_OBJECT (src, "No longer flushing");
+  GST_OBJECT_LOCK (src);
+  gst_poll_set_flushing (src->fdset, FALSE);
+  GST_OBJECT_UNLOCK (src);
 
   return TRUE;
 }
@@ -352,26 +334,22 @@ gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   guint blocksize;
 
 #ifndef HAVE_WIN32
-  fd_set readfds;
   gint retval;
 #endif
 
   src = GST_FD_SRC (psrc);
 
 #ifndef HAVE_WIN32
-  FD_ZERO (&readfds);
-  FD_SET (src->fd, &readfds);
-  FD_SET (READ_SOCKET (src), &readfds);
-
   do {
-    retval = select (FD_SETSIZE, &readfds, NULL, NULL, NULL);
-  } while ((retval == -1 && errno == EINTR));
-
-  if (retval == -1)
-    goto select_error;
-
-  if (FD_ISSET (READ_SOCKET (src), &readfds))
-    goto stopped;
+    retval = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE);
+  } while (retval == -1 && errno == EINTR);
+
+  if (retval == -1) {
+    if (errno == EBUSY)
+      goto stopped;
+    else
+      goto select_error;
+  }
 #endif
 
   blocksize = GST_BASE_SRC (src)->blocksize;
@@ -530,7 +508,7 @@ gst_fd_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
   }
   g_free (protocol);
 
-  if (sscanf (uri, "fd://%d", &fd) != 1)
+  if (sscanf (uri, "fd://%d", &fd) != 1 || fd < 0)
     return FALSE;
 
   src->new_fd = fd;
index 9c3a322..2f8b419 100644 (file)
@@ -64,7 +64,7 @@ struct _GstFdSrc {
 
   gchar *uri;
 
-  gint control_sock[2];
+  GstPoll *fdset;
 
   gulong curoffset; /* current offset in file */
 };