gst/tcp/: Removed fdset and stress test, they are now known as GstPoll in core.
authorPeter Kjellerstedt <pkj@axis.com>
Thu, 28 Feb 2008 10:54:14 +0000 (10:54 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Thu, 28 Feb 2008 10:54:14 +0000 (10:54 +0000)
Original commit message from CVS:
Patch by: Peter Kjellerstedt  <pkj at axis com>
* gst/tcp/Makefile.am:
* gst/tcp/fdsetstress.c:
* gst/tcp/gstfdset.c:
* gst/tcp/gstfdset.h:
Removed fdset and stress test, they are now known as GstPoll in
core.
* gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_class_init),
(gst_multi_fd_sink_add_full), (gst_multi_fd_sink_remove),
(gst_multi_fd_sink_clear), (gst_multi_fd_sink_remove_client_link),
(gst_multi_fd_sink_handle_client_write),
(gst_multi_fd_sink_queue_buffer),
(gst_multi_fd_sink_handle_clients), (gst_multi_fd_sink_start),
(gst_multi_fd_sink_stop):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcp.c: (gst_tcp_socket_read), (gst_tcp_socket_close),
(gst_tcp_read_buffer), (gst_tcp_gdp_read_buffer),
(gst_tcp_gdp_read_caps):
* gst/tcp/gsttcp.h:
* gst/tcp/gsttcpclientsink.c: (gst_tcp_client_sink_init),
(gst_tcp_client_sink_setcaps), (gst_tcp_client_sink_render),
(gst_tcp_client_sink_start), (gst_tcp_client_sink_stop):
* gst/tcp/gsttcpclientsink.h:
* gst/tcp/gsttcpclientsrc.c: (gst_tcp_client_src_init),
(gst_tcp_client_src_create), (gst_tcp_client_src_start),
(gst_tcp_client_src_stop), (gst_tcp_client_src_unlock):
* gst/tcp/gsttcpclientsrc.h:
* gst/tcp/gsttcpserversink.c: (gst_tcp_server_sink_handle_wait),
(gst_tcp_server_sink_init_send), (gst_tcp_server_sink_close):
* gst/tcp/gsttcpserversink.h:
* gst/tcp/gsttcpserversrc.c: (gst_tcp_server_src_init),
(gst_tcp_server_src_create), (gst_tcp_server_src_start),
(gst_tcp_server_src_stop), (gst_tcp_server_src_unlock):
* gst/tcp/gsttcpserversrc.h:
Port to GstPoll. See #505417.

17 files changed:
ChangeLog
gst/tcp/Makefile.am
gst/tcp/fdsetstress.c [deleted file]
gst/tcp/gstfdset.c [deleted file]
gst/tcp/gstfdset.h [deleted file]
gst/tcp/gstmultifdsink.c
gst/tcp/gstmultifdsink.h
gst/tcp/gsttcp.c
gst/tcp/gsttcp.h
gst/tcp/gsttcpclientsink.c
gst/tcp/gsttcpclientsink.h
gst/tcp/gsttcpclientsrc.c
gst/tcp/gsttcpclientsrc.h
gst/tcp/gsttcpserversink.c
gst/tcp/gsttcpserversink.h
gst/tcp/gsttcpserversrc.c
gst/tcp/gsttcpserversrc.h

index 979d1bc..bf22627 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -2,6 +2,46 @@
 
        Patch by: Peter Kjellerstedt  <pkj at axis com>
 
+       * gst/tcp/Makefile.am:
+       * gst/tcp/fdsetstress.c:
+       * gst/tcp/gstfdset.c:
+       * gst/tcp/gstfdset.h:
+       Removed fdset and stress test, they are now known as GstPoll in
+       core. 
+
+       * gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_class_init),
+       (gst_multi_fd_sink_add_full), (gst_multi_fd_sink_remove),
+       (gst_multi_fd_sink_clear), (gst_multi_fd_sink_remove_client_link),
+       (gst_multi_fd_sink_handle_client_write),
+       (gst_multi_fd_sink_queue_buffer),
+       (gst_multi_fd_sink_handle_clients), (gst_multi_fd_sink_start),
+       (gst_multi_fd_sink_stop):
+       * gst/tcp/gstmultifdsink.h:
+       * gst/tcp/gsttcp.c: (gst_tcp_socket_read), (gst_tcp_socket_close),
+       (gst_tcp_read_buffer), (gst_tcp_gdp_read_buffer),
+       (gst_tcp_gdp_read_caps):
+       * gst/tcp/gsttcp.h:
+       * gst/tcp/gsttcpclientsink.c: (gst_tcp_client_sink_init),
+       (gst_tcp_client_sink_setcaps), (gst_tcp_client_sink_render),
+       (gst_tcp_client_sink_start), (gst_tcp_client_sink_stop):
+       * gst/tcp/gsttcpclientsink.h:
+       * gst/tcp/gsttcpclientsrc.c: (gst_tcp_client_src_init),
+       (gst_tcp_client_src_create), (gst_tcp_client_src_start),
+       (gst_tcp_client_src_stop), (gst_tcp_client_src_unlock):
+       * gst/tcp/gsttcpclientsrc.h:
+       * gst/tcp/gsttcpserversink.c: (gst_tcp_server_sink_handle_wait),
+       (gst_tcp_server_sink_init_send), (gst_tcp_server_sink_close):
+       * gst/tcp/gsttcpserversink.h:
+       * gst/tcp/gsttcpserversrc.c: (gst_tcp_server_src_init),
+       (gst_tcp_server_src_create), (gst_tcp_server_src_start),
+       (gst_tcp_server_src_stop), (gst_tcp_server_src_unlock):
+       * gst/tcp/gsttcpserversrc.h:
+       Port to GstPoll. See #505417.
+
+2008-02-28  Wim Taymans  <wim.taymans@collabora.co.uk>
+
+       Patch by: Peter Kjellerstedt  <pkj at axis com>
+
        * gst-libs/gst/rtsp/gstrtspconnection.c:
        (gst_rtsp_connection_create), (gst_rtsp_connection_connect),
        (gst_rtsp_connection_write), (gst_rtsp_connection_read_internal),
index 02100c0..09a566e 100644 (file)
@@ -15,7 +15,6 @@ BUILT_SOURCES = $(built_sources) $(built_headers)
 libgsttcp_la_SOURCES = \
        gsttcpplugin.c \
        gsttcp.c \
-       gstfdset.c \
        gstmultifdsink.c  \
        gsttcpclientsrc.c gsttcpclientsink.c \
        gsttcpserversrc.c gsttcpserversink.c
@@ -31,7 +30,6 @@ libgsttcp_la_LIBADD = $(GST_BASE_LIBS) $(GST_GDP_LIBS) $(GST_LIBS)
 noinst_HEADERS = \
   gsttcpplugin.h \
   gsttcp.h \
-  gstfdset.h \
   gstmultifdsink.h  \
   gsttcpclientsrc.h gsttcpclientsink.h \
   gsttcpserversrc.h gsttcpserversink.h
@@ -39,9 +37,3 @@ noinst_HEADERS = \
 CLEANFILES = $(BUILT_SOURCES)
 
 EXTRA_DIST = gsttcp-marshal.list
-
-noinst_PROGRAMS = fdsetstress
-
-fdsetstress_SOURCES = fdsetstress.c gstfdset.c 
-fdsetstress_CFLAGS = $(GST_CFLAGS)
-fdsetstress_LDFLAGS = $(GST_LIBS)
diff --git a/gst/tcp/fdsetstress.c b/gst/tcp/fdsetstress.c
deleted file mode 100644 (file)
index 8bc13e3..0000000
+++ /dev/null
@@ -1,179 +0,0 @@
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-#include <stdlib.h>
-#include <gst/gst.h>
-
-#include "gstfdset.h"
-
-static GstFDSet *set;
-static GList *fds = NULL;
-static GMutex *fdlock;
-static GTimer *timer;
-
-#define MAX_THREADS  100
-
-static void
-mess_some_more ()
-{
-  GList *walk;
-  gint random;
-  gint removed = 0;
-
-  g_mutex_lock (fdlock);
-
-  for (walk = fds; walk;) {
-    GstFD *fd = (GstFD *) walk->data;
-
-    walk = g_list_next (walk);
-
-    random = (gint) (10.0 * rand () / (RAND_MAX + 1.0));
-    switch (random) {
-      case 0:
-      {
-        /*
-           GstFD *newfd = g_new0 (GstFD, 1);
-
-           gst_fdset_add_fd (set, newfd);
-           fds = g_list_prepend (fds, newfd);
-         */
-        break;
-      }
-      case 1:
-        if ((gint) (10.0 * rand () / (RAND_MAX + 1.0)) < 2) {
-          gst_fdset_remove_fd (set, fd);
-          fds = g_list_remove (fds, fd);
-          g_free (fd);
-          removed++;
-        }
-        break;
-
-      case 2:
-        gst_fdset_fd_ctl_write (set, fd, TRUE);
-        break;
-      case 3:
-        gst_fdset_fd_ctl_write (set, fd, FALSE);
-        break;
-      case 4:
-        gst_fdset_fd_ctl_read (set, fd, TRUE);
-        break;
-      case 5:
-        gst_fdset_fd_ctl_read (set, fd, FALSE);
-        break;
-
-      case 6:
-        gst_fdset_fd_has_closed (set, fd);
-        break;
-      case 7:
-        gst_fdset_fd_has_error (set, fd);
-        break;
-      case 8:
-        gst_fdset_fd_can_read (set, fd);
-        break;
-      case 9:
-        gst_fdset_fd_can_write (set, fd);
-        break;
-      default:
-        g_assert_not_reached ();
-        break;
-    }
-  }
-  if (g_list_length (fds) < 900) {
-    random = removed + (gint) (2.0 * rand () / (RAND_MAX + 1.0));
-    while (random) {
-      GstFD *newfd = g_new0 (GstFD, 1);
-
-      gst_fdset_add_fd (set, newfd);
-      fds = g_list_prepend (fds, newfd);
-      random--;
-    }
-  }
-
-  g_mutex_unlock (fdlock);
-}
-
-void *
-run_test (void *threadid)
-{
-  gint id = GPOINTER_TO_INT (threadid);
-
-  while (TRUE) {
-    if (id == 0) {
-      gint res = gst_fdset_wait (set, 10);
-
-      if (res < 0) {
-        g_print ("error %d %s\n", errno, g_strerror (errno));
-      }
-    } else {
-      mess_some_more ();
-      if (g_timer_elapsed (timer, NULL) > 0.5) {
-        g_mutex_lock (fdlock);
-        g_print ("active fds :%d\n", g_list_length (fds));
-        g_timer_start (timer);
-        g_mutex_unlock (fdlock);
-      }
-      g_usleep (1);
-    }
-  }
-
-  g_thread_exit (NULL);
-  return NULL;
-}
-
-gint
-main (gint argc, gchar * argv[])
-{
-  GThread *threads[MAX_THREADS];
-  gint num_threads;
-  gint t;
-
-  gst_init (&argc, &argv);
-
-  fdlock = g_mutex_new ();
-  timer = g_timer_new ();
-
-  if (argc != 2) {
-    g_print ("usage: %s <num_threads>\n", argv[0]);
-    exit (-1);
-  }
-
-  num_threads = atoi (argv[1]);
-
-  set = gst_fdset_new (GST_FDSET_MODE_POLL);
-
-  for (t = 0; t < num_threads; t++) {
-    GError *error = NULL;
-
-    threads[t] = g_thread_create (run_test, GINT_TO_POINTER (t), TRUE, &error);
-    if (error) {
-      printf ("ERROR: g_thread_create() %s\n", error->message);
-      exit (-1);
-    }
-  }
-  printf ("main(): Created %d threads.\n", t);
-
-  for (t = 0; t < num_threads; t++) {
-    g_thread_join (threads[t]);
-  }
-
-  gst_fdset_free (set);
-
-  return 0;
-}
diff --git a/gst/tcp/gstfdset.c b/gst/tcp/gstfdset.c
deleted file mode 100644 (file)
index 5f22238..0000000
+++ /dev/null
@@ -1,532 +0,0 @@
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- * Copyright (C) <2004> Wim Taymans <wim@fluendo.com>
- *
- * gsttcpfdset.h: fdset datastructure
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-#define MIN_POLLFDS     32
-#define INIT_POLLFDS    MIN_POLLFDS
-
-#include <sys/poll.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <unistd.h>
-/* OS/X needs this because of bad headers */
-#include <string.h>
-
-#include "gstfdset.h"
-
-GType
-gst_fdset_mode_get_type (void)
-{
-  static GType fdset_mode_type = 0;
-  static const GEnumValue fdset_mode[] = {
-    {GST_FDSET_MODE_SELECT, "Select", "select"},
-    {GST_FDSET_MODE_POLL, "Poll", "poll"},
-    {GST_FDSET_MODE_EPOLL, "EPoll", "epoll"},
-    {0, NULL, NULL},
-  };
-
-  if (!fdset_mode_type) {
-    fdset_mode_type = g_enum_register_static ("GstFDSetMode", fdset_mode);
-  }
-  return fdset_mode_type;
-}
-
-struct _GstFDSet
-{
-  GstFDSetMode mode;
-
-  /* for poll */
-  struct pollfd *testpollfds;
-  gint last_testpollfds;
-  gint testsize;
-
-  struct pollfd *pollfds;
-  gint size;
-  gint free;
-  gint last_pollfds;
-  GMutex *poll_lock;
-
-  /* for select */
-  fd_set readfds, writefds;     /* input */
-  fd_set testreadfds, testwritefds;     /* output */
-};
-
-static gint
-nearest_pow (gint num)
-{
-  /* hacker's delight page 48 */
-  num -= 1;
-  num |= num >> 1;
-  num |= num >> 2;
-  num |= num >> 4;
-  num |= num >> 8;
-  num |= num >> 16;
-
-  return num + 1;
-}
-
-/* resize a given pollfd array from old_size number of items
- * to new_size number of items. Also initializes the new elements
- * with the default values. */
-static struct pollfd *
-resize (struct pollfd *fds, gint old_size, gint new_size)
-{
-  struct pollfd *res;
-  gint i;
-
-  res = g_realloc (fds, new_size * sizeof (struct pollfd));
-  for (i = old_size; i < new_size; i++) {
-    res[i].fd = -1;
-    res[i].events = 0;
-    res[i].revents = 0;
-  }
-  return res;
-}
-
-static void
-ensure_size (GstFDSet * set, gint len)
-{
-  if (len > set->size) {
-    len = nearest_pow (len);
-    len = MAX (len, MIN_POLLFDS);
-
-    set->pollfds = resize (set->pollfds, set->size, len);
-    set->size = len;
-  }
-}
-
-GstFDSet *
-gst_fdset_new (GstFDSetMode mode)
-{
-  GstFDSet *nset;
-
-  nset = g_new0 (GstFDSet, 1);
-  nset->mode = mode;
-
-  switch (mode) {
-    case GST_FDSET_MODE_SELECT:
-      FD_ZERO (&nset->readfds);
-      FD_ZERO (&nset->writefds);
-      break;
-    case GST_FDSET_MODE_POLL:
-      nset->pollfds = NULL;
-      nset->testpollfds = NULL;
-      nset->free = 0;
-      nset->last_pollfds = 0;
-      nset->poll_lock = g_mutex_new ();
-      ensure_size (nset, MIN_POLLFDS);
-      break;
-    case GST_FDSET_MODE_EPOLL:
-      g_warning ("implement EPOLL mode in GstFDSet");
-      break;
-    default:
-      break;
-  }
-  return nset;
-}
-
-void
-gst_fdset_free (GstFDSet * set)
-{
-  g_return_if_fail (set != NULL);
-
-  switch (set->mode) {
-    case GST_FDSET_MODE_SELECT:
-      break;
-    case GST_FDSET_MODE_POLL:
-      g_free (set->testpollfds);
-      g_free (set->pollfds);
-      g_mutex_free (set->poll_lock);
-      break;
-    case GST_FDSET_MODE_EPOLL:
-      g_warning ("implement EPOLL mode in GstFDSet");
-      break;
-    default:
-      break;
-  }
-  g_free (set);
-}
-
-
-void
-gst_fdset_set_mode (GstFDSet * set, GstFDSetMode mode)
-{
-  g_return_if_fail (set != NULL);
-
-  g_warning ("implement set_mode in GstFDSet");
-}
-
-GstFDSetMode
-gst_fdset_get_mode (GstFDSet * set)
-{
-  g_return_val_if_fail (set != NULL, FALSE);
-
-  return set->mode;
-}
-
-gboolean
-gst_fdset_add_fd (GstFDSet * set, GstFD * fd)
-{
-  gboolean res = FALSE;
-
-  g_return_val_if_fail (set != NULL, FALSE);
-  g_return_val_if_fail (fd != NULL, FALSE);
-
-  switch (set->mode) {
-    case GST_FDSET_MODE_SELECT:
-      res = TRUE;
-      break;
-    case GST_FDSET_MODE_POLL:
-    {
-      struct pollfd *nfd;
-      gint idx;
-
-      g_mutex_lock (set->poll_lock);
-
-      ensure_size (set, set->last_pollfds + 1);
-
-      idx = set->free;
-      if (idx == -1) {
-        /* find free space */
-        while (idx < set->last_pollfds) {
-          idx++;
-          if (set->pollfds[idx].fd == -1)
-            break;
-        }
-      }
-      nfd = &set->pollfds[idx];
-
-      nfd->fd = fd->fd;
-      nfd->events = POLLERR | POLLNVAL | POLLHUP;
-      nfd->revents = 0;
-
-      /* see if we have one fd more */
-      set->last_pollfds = MAX (idx + 1, set->last_pollfds);
-      fd->idx = idx;
-      set->free = -1;
-      g_mutex_unlock (set->poll_lock);
-
-      res = TRUE;
-      break;
-    }
-    case GST_FDSET_MODE_EPOLL:
-      break;
-  }
-  return res;
-}
-
-gboolean
-gst_fdset_remove_fd (GstFDSet * set, GstFD * fd)
-{
-  gboolean res = FALSE;
-
-  g_return_val_if_fail (set != NULL, FALSE);
-  g_return_val_if_fail (fd != NULL, FALSE);
-
-  switch (set->mode) {
-    case GST_FDSET_MODE_SELECT:
-      /* nothing */
-      FD_CLR (fd->fd, &set->writefds);
-      FD_CLR (fd->fd, &set->readfds);
-      res = TRUE;
-      break;
-    case GST_FDSET_MODE_POLL:
-    {
-      g_mutex_lock (set->poll_lock);
-
-      /* FIXME on some platforms poll doesn't ignore the fd
-       * when set to -1 */
-      set->pollfds[fd->idx].fd = -1;
-      set->pollfds[fd->idx].events = 0;
-      set->pollfds[fd->idx].revents = 0;
-
-      /* if we removed the last fd, we can lower the last_pollfds */
-      if (fd->idx + 1 == set->last_pollfds) {
-        set->last_pollfds--;
-      }
-      fd->idx = -1;
-
-      if (set->free == -1) {
-        set->free = fd->idx;
-      } else {
-        set->free = MIN (set->free, fd->idx);
-      }
-      g_mutex_unlock (set->poll_lock);
-      res = TRUE;
-      break;
-    }
-    case GST_FDSET_MODE_EPOLL:
-      break;
-  }
-  return res;
-}
-
-void
-gst_fdset_fd_ctl_write (GstFDSet * set, GstFD * fd, gboolean active)
-{
-  g_return_if_fail (set != NULL);
-  g_return_if_fail (fd != NULL);
-
-  switch (set->mode) {
-    case GST_FDSET_MODE_SELECT:
-      if (active)
-        FD_SET (fd->fd, &set->writefds);
-      else
-        FD_CLR (fd->fd, &set->writefds);
-      break;
-    case GST_FDSET_MODE_POLL:
-    {
-      gint idx;
-
-      g_mutex_lock (set->poll_lock);
-
-      idx = fd->idx;
-      if (idx >= 0) {
-        gint events = set->pollfds[idx].events;
-
-        if (active)
-          events |= POLLOUT;
-        else
-          events &= ~POLLOUT;
-
-        set->pollfds[idx].events = events;
-      }
-      g_mutex_unlock (set->poll_lock);
-      break;
-    }
-    case GST_FDSET_MODE_EPOLL:
-      break;
-  }
-}
-
-void
-gst_fdset_fd_ctl_read (GstFDSet * set, GstFD * fd, gboolean active)
-{
-  g_return_if_fail (set != NULL);
-  g_return_if_fail (fd != NULL);
-
-  switch (set->mode) {
-    case GST_FDSET_MODE_SELECT:
-      if (active)
-        FD_SET (fd->fd, &set->readfds);
-      else
-        FD_CLR (fd->fd, &set->readfds);
-      break;
-    case GST_FDSET_MODE_POLL:
-    {
-      gint idx;
-
-      g_mutex_lock (set->poll_lock);
-
-      idx = fd->idx;
-      if (idx >= 0) {
-        gint events = set->pollfds[idx].events;
-
-        if (active)
-          events |= (POLLIN | POLLPRI);
-        else
-          events &= ~(POLLIN | POLLPRI);
-
-        set->pollfds[idx].events = events;
-      }
-      g_mutex_unlock (set->poll_lock);
-      break;
-    }
-    case GST_FDSET_MODE_EPOLL:
-      break;
-  }
-}
-
-gboolean
-gst_fdset_fd_has_closed (GstFDSet * set, GstFD * fd)
-{
-  gboolean res = FALSE;
-
-  g_return_val_if_fail (set != NULL, FALSE);
-  g_return_val_if_fail (fd != NULL, FALSE);
-
-  switch (set->mode) {
-    case GST_FDSET_MODE_SELECT:
-      res = FALSE;
-      break;
-    case GST_FDSET_MODE_POLL:
-    {
-      gint idx;
-
-      g_mutex_lock (set->poll_lock);
-      idx = fd->idx;
-
-      if (idx >= 0 && idx < set->last_testpollfds) {
-        res = (set->testpollfds[idx].revents & POLLHUP) != 0;
-      }
-      g_mutex_unlock (set->poll_lock);
-      break;
-    }
-    case GST_FDSET_MODE_EPOLL:
-      break;
-  }
-  return res;
-}
-
-gboolean
-gst_fdset_fd_has_error (GstFDSet * set, GstFD * fd)
-{
-  gboolean res = FALSE;
-
-  g_return_val_if_fail (set != NULL, FALSE);
-  g_return_val_if_fail (fd != NULL, FALSE);
-
-  switch (set->mode) {
-    case GST_FDSET_MODE_SELECT:
-      res = FALSE;
-      break;
-    case GST_FDSET_MODE_POLL:
-    {
-      gint idx;
-
-      g_mutex_lock (set->poll_lock);
-      idx = fd->idx;
-
-      if (idx >= 0 && idx < set->last_testpollfds) {
-        res = (set->testpollfds[idx].revents & (POLLERR | POLLNVAL)) != 0;
-      }
-      g_mutex_unlock (set->poll_lock);
-      break;
-    }
-    case GST_FDSET_MODE_EPOLL:
-      break;
-  }
-  return res;
-}
-
-gboolean
-gst_fdset_fd_can_read (GstFDSet * set, GstFD * fd)
-{
-  gboolean res = FALSE;
-
-  g_return_val_if_fail (set != NULL, FALSE);
-  g_return_val_if_fail (fd != NULL, FALSE);
-
-  switch (set->mode) {
-    case GST_FDSET_MODE_SELECT:
-      res = FD_ISSET (fd->fd, &set->testreadfds);
-      break;
-    case GST_FDSET_MODE_POLL:
-    {
-      gint idx;
-
-      g_mutex_lock (set->poll_lock);
-      idx = fd->idx;
-
-      if (idx >= 0 && idx < set->last_testpollfds) {
-        res = (set->testpollfds[idx].revents & (POLLIN | POLLPRI)) != 0;
-      }
-      g_mutex_unlock (set->poll_lock);
-      break;
-    }
-    case GST_FDSET_MODE_EPOLL:
-      break;
-  }
-  return res;
-}
-
-gboolean
-gst_fdset_fd_can_write (GstFDSet * set, GstFD * fd)
-{
-  gboolean res = FALSE;
-
-  g_return_val_if_fail (set != NULL, FALSE);
-  g_return_val_if_fail (fd != NULL, FALSE);
-
-  switch (set->mode) {
-    case GST_FDSET_MODE_SELECT:
-      res = FD_ISSET (fd->fd, &set->testwritefds);
-      break;
-    case GST_FDSET_MODE_POLL:
-    {
-      gint idx;
-
-      g_mutex_lock (set->poll_lock);
-      idx = fd->idx;
-
-      if (idx >= 0 && idx < set->last_testpollfds) {
-        res = (set->testpollfds[idx].revents & POLLOUT) != 0;
-      }
-      g_mutex_unlock (set->poll_lock);
-      break;
-    }
-    case GST_FDSET_MODE_EPOLL:
-      break;
-  }
-  return res;
-}
-
-gint
-gst_fdset_wait (GstFDSet * set, int timeout)
-{
-  int res = -1;
-
-  g_return_val_if_fail (set != NULL, -1);
-
-  switch (set->mode) {
-    case GST_FDSET_MODE_SELECT:
-    {
-      struct timeval tv;
-      struct timeval *tvptr;
-
-      set->testreadfds = set->readfds;
-      set->testwritefds = set->writefds;
-
-      if (timeout > 0) {
-        tv.tv_sec = timeout / 1000;
-        tv.tv_usec = timeout % 1000;
-
-        tvptr = &tv;
-      } else {
-        tvptr = NULL;
-      }
-      res =
-          select (FD_SETSIZE, &set->testreadfds, &set->testwritefds,
-          (fd_set *) 0, tvptr);
-      break;
-    }
-    case GST_FDSET_MODE_POLL:
-    {
-      g_mutex_lock (set->poll_lock);
-      if (set->testsize != set->size) {
-        set->testpollfds = resize (set->testpollfds, set->testsize, set->size);
-        set->testsize = set->size;
-      }
-      set->last_testpollfds = set->last_pollfds;
-      memcpy (set->testpollfds, set->pollfds,
-          sizeof (struct pollfd) * set->last_testpollfds);
-      g_mutex_unlock (set->poll_lock);
-
-      res = poll (set->testpollfds, set->last_testpollfds, timeout);
-
-      break;
-    }
-    case GST_FDSET_MODE_EPOLL:
-      break;
-  }
-
-  return res;
-}
diff --git a/gst/tcp/gstfdset.h b/gst/tcp/gstfdset.h
deleted file mode 100644 (file)
index 1f93490..0000000
+++ /dev/null
@@ -1,68 +0,0 @@
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- * Copyright (C) <2004> Wim Taymans <wim@fluendo.com>
- *
- * gstfdset.h: fdset datastructure
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-#ifndef __GST_FDSET_H__
-#define __GST_FDSET_H__
-
-#include <gst/gst.h>
-
-G_BEGIN_DECLS
-
-typedef struct _GstFDSet GstFDSet;
-
-typedef struct {
-  int fd;
-  gint idx;
-} GstFD;
-
-typedef enum {
-  GST_FDSET_MODE_SELECT,
-  GST_FDSET_MODE_POLL,
-  GST_FDSET_MODE_EPOLL
-} GstFDSetMode;
-
-#define GST_TYPE_FDSET_MODE (gst_fdset_mode_get_type())
-GType gst_fdset_mode_get_type (void);
-
-
-GstFDSet*       gst_fdset_new (GstFDSetMode mode);
-void            gst_fdset_free (GstFDSet *set);
-
-void            gst_fdset_set_mode (GstFDSet *set, GstFDSetMode mode);
-GstFDSetMode    gst_fdset_get_mode (GstFDSet *set);
-
-gboolean        gst_fdset_add_fd (GstFDSet *set, GstFD *fd); 
-gboolean        gst_fdset_remove_fd (GstFDSet *set, GstFD *fd); 
-
-void            gst_fdset_fd_ctl_write (GstFDSet *set, GstFD *fd, gboolean active); 
-void            gst_fdset_fd_ctl_read (GstFDSet *set, GstFD *fd, gboolean active); 
-
-gboolean        gst_fdset_fd_has_closed (GstFDSet *set, GstFD *fd); 
-gboolean        gst_fdset_fd_has_error (GstFDSet *set, GstFD *fd); 
-gboolean        gst_fdset_fd_can_read (GstFDSet *set, GstFD *fd); 
-gboolean        gst_fdset_fd_can_write (GstFDSet *set, GstFD *fd); 
-
-gint            gst_fdset_wait (GstFDSet *set, int timeout);
-
-G_END_DECLS
-
-#endif /* __GST_FDSET_H__ */
index d115d80..6f6a092 100644 (file)
 
 #define NOT_IMPLEMENTED 0
 
-/* the select call is also performed on the control sockets, that way
- * we can send special commands to unblock or restart the select call */
-#define CONTROL_RESTART         'R'     /* restart the select call */
-#define CONTROL_STOP            'S'     /* stop the select call */
-#define CONTROL_SOCKETS(sink)   sink->control_sock
-#define WRITE_SOCKET(sink)      sink->control_sock[1]
-#define READ_SOCKET(sink)       sink->control_sock[0]
-
-#define SEND_COMMAND(sink, command)             \
-G_STMT_START {                                  \
-  unsigned char c; c = command;                 \
-  write (WRITE_SOCKET(sink).fd, &c, 1);         \
-} G_STMT_END
-
-#define READ_COMMAND(sink, command, res)        \
-G_STMT_START {                                  \
-  res = read(READ_SOCKET(sink).fd, &command, 1);\
-} G_STMT_END
-
 /* elementfactory information */
 static const GstElementDetails gst_multi_fd_sink_details =
 GST_ELEMENT_DETAILS ("Multi filedescriptor sink",
@@ -188,7 +169,7 @@ enum
 
 /* this is really arbitrarily chosen */
 #define DEFAULT_PROTOCOL                GST_TCP_PROTOCOL_NONE
-#define DEFAULT_MODE                    GST_FDSET_MODE_POLL
+#define DEFAULT_MODE                    GST_POLL_MODE_AUTO
 #define DEFAULT_BUFFERS_MAX             -1
 #define DEFAULT_BUFFERS_SOFT_MAX        -1
 #define DEFAULT_TIME_MIN                -1
@@ -380,7 +361,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass)
           GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
   g_object_class_install_property (gobject_class, PROP_MODE,
       g_param_spec_enum ("mode", "Mode",
-          "The mode for selecting activity on the fds", GST_TYPE_FDSET_MODE,
+          "The mode for selecting activity on the fds", GST_TYPE_POLL_MODE,
           DEFAULT_MODE, G_PARAM_READWRITE));
 
   g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
@@ -733,12 +714,12 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
   /* set the socket to non blocking */
   res = fcntl (fd, F_SETFL, O_NONBLOCK);
   /* we always read from a client */
-  gst_fdset_add_fd (sink->fdset, &client->fd);
+  gst_poll_add_fd (sink->fdset, &client->fd);
 
   /* we don't try to read from write only fds */
   flags = fcntl (fd, F_GETFL, 0);
   if ((flags & O_ACCMODE) != O_WRONLY) {
-    gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE);
+    gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE);
   }
   /* figure out the mode, can't use send() for non sockets */
   res = fstat (fd, &statbuf);
@@ -746,7 +727,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
     client->is_socket = TRUE;
   }
 
-  SEND_COMMAND (sink, CONTROL_RESTART);
+  gst_poll_restart (sink->fdset);
 
   CLIENTS_UNLOCK (sink);
 
@@ -807,7 +788,7 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
 
     client->status = GST_CLIENT_STATUS_REMOVED;
     gst_multi_fd_sink_remove_client_link (sink, clink);
-    SEND_COMMAND (sink, CONTROL_RESTART);
+    gst_poll_restart (sink->fdset);
   } else {
     GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
   }
@@ -877,7 +858,7 @@ restart:
     client->status = GST_CLIENT_STATUS_REMOVED;
     gst_multi_fd_sink_remove_client_link (sink, clients);
   }
-  SEND_COMMAND (sink, CONTROL_RESTART);
+  gst_poll_restart (sink->fdset);
   CLIENTS_UNLOCK (sink);
 }
 
@@ -1010,7 +991,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
       break;
   }
 
-  gst_fdset_remove_fd (sink->fdset, &client->fd);
+  gst_poll_remove_fd (sink->fdset, &client->fd);
 
   g_get_current_time (&now);
   client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
@@ -1877,7 +1858,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
       if (client->bufpos == -1) {
         /* client is too fast, remove from write queue until new buffer is
          * available */
-        gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
+        gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
         /* if we flushed out all of the client buffers, we can stop */
         if (client->flushcount == 0)
           goto flushed;
@@ -1898,7 +1879,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
             client->bufpos = position;
           } else {
             /* cannot send data to this client yet */
-            gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
+            gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
             return TRUE;
           }
         }
@@ -2164,7 +2145,7 @@ restart:
     } else if (client->bufpos == 0 || client->new_connection) {
       /* can send data to this client now. need to signal the select thread that
        * the fd_set changed */
-      gst_fdset_fd_ctl_write (sink->fdset, &client->fd, TRUE);
+      gst_poll_fd_ctl_write (sink->fdset, &client->fd, TRUE);
       need_signal = TRUE;
     }
     /* keep track of maximum buffer usage */
@@ -2241,7 +2222,7 @@ restart:
 
   /* and send a signal to thread if fd_set changed */
   if (need_signal) {
-    SEND_COMMAND (sink, CONTROL_RESTART);
+    gst_poll_restart (sink->fdset);
   }
 }
 
@@ -2265,8 +2246,6 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
   fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
 
   do {
-    gboolean stop = FALSE;
-
     try_again = FALSE;
 
     /* check for:
@@ -2274,7 +2253,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
      * - client socket input (ie, clients saying goodbye)
      * - client socket output (ie, client reads)          */
     GST_LOG_OBJECT (sink, "waiting on action on fdset");
-    result = gst_fdset_wait (sink->fdset, -1);
+    result = gst_poll_wait (sink->fdset, GST_CLOCK_TIME_NONE);
 
     /* < 0 is an error, 0 just means a timeout happened, which is impossible */
     if (result < 0) {
@@ -2317,8 +2296,11 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
          * are not valid */
         try_again = TRUE;
       } else if (errno == EINTR) {
-        /* interrupted system call, just redo the select */
+        /* interrupted system call, just redo the wait */
         try_again = TRUE;
+      } else if (errno == EBUSY) {
+        /* the call to gst_poll_wait() was flushed */
+        return;
       } else {
         /* this is quite bad... */
         GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
@@ -2327,46 +2309,6 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
       }
     } else {
       GST_LOG_OBJECT (sink, "wait done: %d sockets with events", result);
-      /* read all commands */
-      if (gst_fdset_fd_can_read (sink->fdset, &READ_SOCKET (sink))) {
-        GST_LOG_OBJECT (sink, "have a command");
-        while (TRUE) {
-          gchar command;
-          int res;
-
-          READ_COMMAND (sink, command, res);
-          if (res <= 0) {
-            GST_LOG_OBJECT (sink, "no more commands");
-            /* no more commands */
-            break;
-          }
-
-          switch (command) {
-            case CONTROL_RESTART:
-              GST_LOG_OBJECT (sink, "restart");
-              /* need to restart the select call as the fd_set changed */
-              /* if other file descriptors than the READ_SOCKET had activity,
-               * we don't restart just yet, but handle the other clients first
-               */
-              if (result == 1)
-                try_again = TRUE;
-              break;
-            case CONTROL_STOP:
-              /* break out of the select loop */
-              GST_LOG_OBJECT (sink, "stop");
-              /* stop this function */
-              stop = TRUE;
-              break;
-            default:
-              GST_WARNING_OBJECT (sink, "unkown");
-              g_warning ("multifdsink: unknown control message received");
-              break;
-          }
-        }
-      }
-    }
-    if (stop) {
-      return;
     }
   } while (try_again);
 
@@ -2396,25 +2338,25 @@ restart2:
       continue;
     }
 
-    if (gst_fdset_fd_has_closed (sink->fdset, &client->fd)) {
+    if (gst_poll_fd_has_closed (sink->fdset, &client->fd)) {
       client->status = GST_CLIENT_STATUS_CLOSED;
       gst_multi_fd_sink_remove_client_link (sink, clients);
       continue;
     }
-    if (gst_fdset_fd_has_error (sink->fdset, &client->fd)) {
-      GST_WARNING_OBJECT (sink, "gst_fdset_fd_has_error for %d", client->fd.fd);
+    if (gst_poll_fd_has_error (sink->fdset, &client->fd)) {
+      GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->fd.fd);
       client->status = GST_CLIENT_STATUS_ERROR;
       gst_multi_fd_sink_remove_client_link (sink, clients);
       continue;
     }
-    if (gst_fdset_fd_can_read (sink->fdset, &client->fd)) {
+    if (gst_poll_fd_can_read (sink->fdset, &client->fd)) {
       /* handle client read */
       if (!gst_multi_fd_sink_handle_client_read (sink, client)) {
         gst_multi_fd_sink_remove_client_link (sink, clients);
         continue;
       }
     }
-    if (gst_fdset_fd_can_write (sink->fdset, &client->fd)) {
+    if (gst_poll_fd_can_write (sink->fdset, &client->fd)) {
       /* handle client write */
       if (!gst_multi_fd_sink_handle_client_write (sink, client)) {
         gst_multi_fd_sink_remove_client_link (sink, clients);
@@ -2679,7 +2621,6 @@ static gboolean
 gst_multi_fd_sink_start (GstBaseSink * bsink)
 {
   GstMultiFdSinkClass *fclass;
-  int control_socket[2];
   GstMultiFdSink *this;
 
   if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN))
@@ -2689,20 +2630,9 @@ gst_multi_fd_sink_start (GstBaseSink * bsink)
   fclass = GST_MULTI_FD_SINK_GET_CLASS (this);
 
   GST_INFO_OBJECT (this, "starting in mode %d", this->mode);
-  this->fdset = gst_fdset_new (this->mode);
-
-  if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0)
+  if ((this->fdset = gst_poll_new (this->mode, TRUE)) == NULL)
     goto socket_pair;
 
-  READ_SOCKET (this).fd = control_socket[0];
-  WRITE_SOCKET (this).fd = control_socket[1];
-
-  gst_fdset_add_fd (this->fdset, &READ_SOCKET (this));
-  gst_fdset_fd_ctl_read (this->fdset, &READ_SOCKET (this), TRUE);
-
-  fcntl (READ_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
-  fcntl (WRITE_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
-
   this->streamheader = NULL;
   this->bytes_to_serve = 0;
   this->bytes_served = 0;
@@ -2750,7 +2680,7 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink)
 
   this->running = FALSE;
 
-  SEND_COMMAND (this, CONTROL_STOP);
+  gst_poll_set_flushing (this->fdset, TRUE);
   if (this->thread) {
     GST_DEBUG_OBJECT (this, "joining thread");
     g_thread_join (this->thread);
@@ -2761,9 +2691,6 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink)
   /* free the clients */
   gst_multi_fd_sink_clear (this);
 
-  close (READ_SOCKET (this).fd);
-  close (WRITE_SOCKET (this).fd);
-
   if (this->streamheader) {
     g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL);
     g_slist_free (this->streamheader);
@@ -2774,8 +2701,7 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink)
     fclass->close (this);
 
   if (this->fdset) {
-    gst_fdset_remove_fd (this->fdset, &READ_SOCKET (this));
-    gst_fdset_free (this->fdset);
+    gst_poll_free (this->fdset);
     this->fdset = NULL;
   }
   g_hash_table_foreach_remove (this->fd_hash, multifdsink_hash_remove, this);
index baf70d7..a4f557e 100644 (file)
@@ -28,7 +28,6 @@
 G_BEGIN_DECLS
 
 #include "gsttcp.h"
-#include "gstfdset.h"
 
 #define GST_TYPE_MULTI_FD_SINK \
   (gst_multi_fd_sink_get_type())
@@ -140,7 +139,7 @@ typedef enum
 /* structure for a client
  */
 typedef struct {
-  GstFD fd;
+  GstPollFD fd;
 
   gint bufpos;                  /* position of this client in the global queue */
   gint flushcount;              /* the remaining number of buffers to flush out or -1 if the 
@@ -201,10 +200,8 @@ struct _GstMultiFdSink {
   GHashTable *fd_hash;  /* index on fd to client */
   guint clients_cookie; /* Cookie to detect changes to the clients list */
 
-  GstFDSetMode mode;
-  GstFDSet *fdset;
-
-  GstFD control_sock[2];/* sockets for controlling the select call */
+  GstPollMode mode;
+  GstPoll *fdset;
 
   GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
   gboolean previous_buffer_in_caps;
@@ -259,7 +256,7 @@ struct _GstMultiFdSinkClass {
 
   /* vtable */
   gboolean (*init)   (GstMultiFdSink *sink);
-  gboolean (*wait)   (GstMultiFdSink *sink, GstFDSet *set);
+  gboolean (*wait)   (GstMultiFdSink *sink, GstPoll *set);
   gboolean (*close)  (GstMultiFdSink *sink);
   void (*removed) (GstMultiFdSink *sink, int fd);
 
index 1ee91fb..584a25a 100644 (file)
@@ -29,7 +29,6 @@
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <netdb.h>
-#include <string.h>             /* memset, in FD_ZERO macro */
 #include <unistd.h>
 #include <sys/ioctl.h>
 
@@ -128,30 +127,24 @@ gst_tcp_socket_write (int socket, const void *buf, size_t count)
  */
 static GstFlowReturn
 gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count,
-    int cancel_fd)
+    GstPoll * fdset)
 {
-  fd_set testfds;
-  int maxfdp1;
   ssize_t n;
   size_t bytes_read;
   int num_to_read;
+  int ret;
 
   bytes_read = 0;
 
   while (bytes_read < count) {
     /* do a blocking select on the socket */
-    FD_ZERO (&testfds);
-    FD_SET (socket, &testfds);
-    if (cancel_fd >= 0)
-      FD_SET (cancel_fd, &testfds);
-    maxfdp1 = MAX (socket, cancel_fd) + 1;
-
     /* no action (0) is an error too in our case */
-    if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0)
-      goto select_error;
-
-    if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
-      goto cancelled;
+    if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) {
+      if (ret == -1 && errno == EBUSY)
+        goto cancelled;
+      else
+        goto select_error;
+    }
 
     /* ask how much is available for reading on the socket */
     if (ioctl (socket, FIONREAD, &num_to_read) < 0)
@@ -216,10 +209,12 @@ short_read:
 
 /* close the socket and reset the fd.  Used to clean up after errors. */
 void
-gst_tcp_socket_close (int *socket)
+gst_tcp_socket_close (GstPollFD * socket)
 {
-  close (*socket);
-  *socket = -1;
+  if (socket->fd >= 0) {
+    close (socket->fd);
+    socket->fd = -1;
+  }
 }
 
 /* read a buffer from the given socket
@@ -229,30 +224,23 @@ gst_tcp_socket_close (int *socket)
  *         EOS
  */
 GstFlowReturn
-gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd,
+gst_tcp_read_buffer (GstElement * this, int socket, GstPoll * fdset,
     GstBuffer ** buf)
 {
-  fd_set testfds;
   int ret;
-  int maxfdp1;
   ssize_t bytes_read;
   int readsize;
 
   *buf = NULL;
 
   /* do a blocking select on the socket */
-  FD_ZERO (&testfds);
-  FD_SET (socket, &testfds);
-  if (cancel_fd >= 0)
-    FD_SET (cancel_fd, &testfds);
-  maxfdp1 = MAX (socket, cancel_fd) + 1;
-
   /* no action (0) is an error too in our case */
-  if ((ret = select (maxfdp1, &testfds, NULL, NULL, 0)) <= 0)
-    goto select_error;
-
-  if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
-    goto cancelled;
+  if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) {
+    if (ret == -1 && errno == EBUSY)
+      goto cancelled;
+    else
+      goto select_error;
+  }
 
   /* ask how much is available for reading on the socket */
   if ((ret = ioctl (socket, FIONREAD, &readsize)) < 0)
@@ -326,7 +314,7 @@ short_read:
  *         EOS
  */
 GstFlowReturn
-gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd,
+gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset,
     GstBuffer ** buf)
 {
   GstFlowReturn ret;
@@ -338,8 +326,7 @@ gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd,
   *buf = NULL;
   header = g_malloc (GST_DP_HEADER_LENGTH);
 
-  ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
-      cancel_fd);
+  ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset);
 
   if (ret != GST_FLOW_OK)
     goto header_read_error;
@@ -357,7 +344,7 @@ gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd,
   g_free (header);
 
   ret = gst_tcp_socket_read (this, socket, GST_BUFFER_DATA (*buf),
-      GST_BUFFER_SIZE (*buf), cancel_fd);
+      GST_BUFFER_SIZE (*buf), fdset);
 
   if (ret != GST_FLOW_OK)
     goto data_read_error;
@@ -394,7 +381,7 @@ data_read_error:
 }
 
 GstFlowReturn
-gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd,
+gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset,
     GstCaps ** caps)
 {
   GstFlowReturn ret;
@@ -408,8 +395,7 @@ gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd,
   *caps = NULL;
   header = g_malloc (GST_DP_HEADER_LENGTH);
 
-  ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
-      cancel_fd);
+  ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset);
 
   if (ret != GST_FLOW_OK)
     goto header_read_error;
@@ -429,7 +415,7 @@ gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd,
       "Reading %" G_GSIZE_FORMAT " bytes for caps packet payload",
       payload_length);
 
-  ret = gst_tcp_socket_read (this, socket, payload, payload_length, cancel_fd);
+  ret = gst_tcp_socket_read (this, socket, payload, payload_length, fdset);
 
   if (ret != GST_FLOW_OK)
     goto payload_read_error;
index 2c15de3..9a89667 100644 (file)
@@ -44,14 +44,14 @@ gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host);
 
 gint gst_tcp_socket_write (int socket, const void *buf, size_t count);
 
-void gst_tcp_socket_close (int *socket);
+void gst_tcp_socket_close (GstPollFD *socket);
 
-GstFlowReturn gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf);
+GstFlowReturn gst_tcp_read_buffer (GstElement * this, int socket, GstPoll * fdset, GstBuffer **buf);
 
-GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf);
-GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd, GstCaps **caps);
+GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset, GstBuffer **buf);
+GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset, GstCaps **caps);
 
-GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, int cancel_fd);
+GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, GstPoll * fdset);
 
 gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
 gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port);
index 32a95c4..8dc1d65 100644 (file)
@@ -160,7 +160,7 @@ gst_tcp_client_sink_init (GstTCPClientSink * this)
   this->host = g_strdup (TCP_DEFAULT_HOST);
   this->port = TCP_DEFAULT_PORT;
 
-  this->sock_fd = -1;
+  this->sock_fd.fd = -1;
   this->protocol = GST_TCP_PROTOCOL_NONE;
   GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
 }
@@ -198,8 +198,8 @@ gst_tcp_client_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
         GST_DEBUG_OBJECT (sink, "Sending caps %s through GDP", string);
         g_free (string);
 
-        if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps,
-                TRUE, sink->host, sink->port))
+        if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd.fd,
+                caps, TRUE, sink->host, sink->port))
           goto gdp_write_error;
 
         sink->caps_sent = TRUE;
@@ -241,7 +241,7 @@ gst_tcp_client_sink_render (GstBaseSink * bsink, GstBuffer * buf)
       break;
     case GST_TCP_PROTOCOL_GDP:
       GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
-      if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd, buf,
+      if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd.fd, buf,
               TRUE, sink->host, sink->port))
         goto gdp_write_error;
       break;
@@ -250,7 +250,7 @@ gst_tcp_client_sink_render (GstBaseSink * bsink, GstBuffer * buf)
   }
 
   /* write buffer data */
-  wrote = gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf), size);
+  wrote = gst_tcp_socket_write (sink->sock_fd.fd, GST_BUFFER_DATA (buf), size);
 
   if (wrote < size)
     goto write_error;
@@ -348,12 +348,12 @@ gst_tcp_client_sink_start (GstTCPClientSink * this)
   /* create sending client socket */
   GST_DEBUG_OBJECT (this, "opening sending client socket to %s:%d", this->host,
       this->port);
-  if ((this->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
+  if ((this->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
     GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM);
     return FALSE;
   }
   GST_DEBUG_OBJECT (this, "opened sending client socket with fd %d",
-      this->sock_fd);
+      this->sock_fd.fd);
 
   /* look up name if we need to */
   ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
@@ -371,7 +371,7 @@ gst_tcp_client_sink_start (GstTCPClientSink * this)
   g_free (ip);
 
   GST_DEBUG_OBJECT (this, "connecting to server");
-  ret = connect (this->sock_fd, (struct sockaddr *) &this->server_sin,
+  ret = connect (this->sock_fd.fd, (struct sockaddr *) &this->server_sin,
       sizeof (this->server_sin));
 
   if (ret) {
@@ -405,10 +405,7 @@ gst_tcp_client_sink_stop (GstTCPClientSink * this)
   if (!GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN))
     return TRUE;
 
-  if (this->sock_fd != -1) {
-    close (this->sock_fd);
-    this->sock_fd = -1;
-  }
+  gst_tcp_socket_close (&this->sock_fd);
 
   GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
 
index 873e2c6..4399870 100644 (file)
@@ -73,7 +73,7 @@ struct _GstTCPClientSink {
   struct sockaddr_in server_sin;
 
   /* socket */
-  int sock_fd;
+  GstPollFD sock_fd;
 
   size_t data_written; /* how much bytes have we written ? */
   GstTCPProtocol protocol; /* used with the protocol enum */
index 87caea5..89a3a24 100644 (file)
 #include <fcntl.h>
 
 
-/* control stuff stolen from fdsrc */
-#define CONTROL_STOP            'S'     /* stop the select call */
-#define CONTROL_SOCKETS(o)      o->control_fds
-#define WRITE_SOCKET(o)         o->control_fds[1]
-#define READ_SOCKET(o)          o->control_fds[0]
-
-#define SEND_COMMAND(o, command)          \
-G_STMT_START {                              \
-  unsigned char c; c = command;             \
-  write (WRITE_SOCKET(o), &c, 1);         \
-} G_STMT_END
-
-#define READ_COMMAND(o, command, res)        \
-G_STMT_START {                                 \
-  res = read(READ_SOCKET(o), &command, 1);   \
-} G_STMT_END
-
-
 GST_DEBUG_CATEGORY_STATIC (tcpclientsrc_debug);
 #define GST_CAT_DEFAULT tcpclientsrc_debug
 
@@ -150,13 +132,10 @@ gst_tcp_client_src_init (GstTCPClientSrc * this, GstTCPClientSrcClass * g_class)
 {
   this->port = TCP_DEFAULT_PORT;
   this->host = g_strdup (TCP_DEFAULT_HOST);
-  this->sock_fd = -1;
+  this->sock_fd.fd = -1;
   this->protocol = GST_TCP_PROTOCOL_NONE;
   this->caps = NULL;
 
-  READ_SOCKET (this) = -1;
-  WRITE_SOCKET (this) = -1;
-
   gst_base_src_set_live (GST_BASE_SRC (this), TRUE);
 
   GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SRC_OPEN);
@@ -207,8 +186,8 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
   /* read the buffer header if we're using a protocol */
   switch (src->protocol) {
     case GST_TCP_PROTOCOL_NONE:
-      ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd,
-          READ_SOCKET (src), outbuf);
+      ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd,
+          src->fdset, outbuf);
       break;
 
     case GST_TCP_PROTOCOL_GDP:
@@ -217,8 +196,8 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
         GstCaps *caps;
 
         GST_DEBUG_OBJECT (src, "getting caps through GDP");
-        ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd,
-            READ_SOCKET (src), &caps);
+        ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd.fd,
+            src->fdset, &caps);
 
         if (ret != GST_FLOW_OK)
           goto no_caps;
@@ -227,8 +206,8 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
         src->caps = caps;
       }
 
-      ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd,
-          READ_SOCKET (src), outbuf);
+      ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd,
+          src->fdset, outbuf);
       break;
     default:
       /* need to assert as buf == NULL */
@@ -323,22 +302,18 @@ gst_tcp_client_src_start (GstBaseSrc * bsrc)
   gchar *ip;
   GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
 
-  /* create the control sockets before anything */
-  if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src)) < 0)
+  if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
     goto socket_pair;
 
-  fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
-  fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
-
   /* create receiving client socket */
   GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
       src->host, src->port);
 
-  if ((src->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
+  if ((src->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
     goto no_socket;
 
   GST_DEBUG_OBJECT (src, "opened receiving client socket with fd %d",
-      src->sock_fd);
+      src->sock_fd.fd);
   GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN);
 
   /* look up name if we need to */
@@ -355,7 +330,7 @@ gst_tcp_client_src_start (GstBaseSrc * bsrc)
   g_free (ip);
 
   GST_DEBUG_OBJECT (src, "connecting to server");
-  ret = connect (src->sock_fd, (struct sockaddr *) &src->server_sin,
+  ret = connect (src->sock_fd.fd, (struct sockaddr *) &src->server_sin,
       sizeof (src->server_sin));
 
   if (ret) {
@@ -406,10 +381,13 @@ gst_tcp_client_src_stop (GstBaseSrc * bsrc)
   src = GST_TCP_CLIENT_SRC (bsrc);
 
   GST_DEBUG_OBJECT (src, "closing socket");
-  if (src->sock_fd != -1) {
-    close (src->sock_fd);
-    src->sock_fd = -1;
+
+  if (src->fdset != NULL) {
+    gst_poll_free (src->fdset);
+    src->fdset = NULL;
   }
+
+  gst_tcp_socket_close (&src->sock_fd);
   src->caps_received = FALSE;
   if (src->caps) {
     gst_caps_unref (src->caps);
@@ -417,11 +395,6 @@ gst_tcp_client_src_stop (GstBaseSrc * bsrc)
   }
   GST_OBJECT_FLAG_UNSET (src, GST_TCP_CLIENT_SRC_OPEN);
 
-  close (READ_SOCKET (src));
-  close (WRITE_SOCKET (src));
-  READ_SOCKET (src) = -1;
-  WRITE_SOCKET (src) = -1;
-
   return TRUE;
 }
 
@@ -431,7 +404,7 @@ gst_tcp_client_src_unlock (GstBaseSrc * bsrc)
 {
   GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
 
-  SEND_COMMAND (src, CONTROL_STOP);
+  gst_poll_set_flushing (src->fdset, TRUE);
 
   return TRUE;
 }
index d44054c..24d31e8 100644 (file)
@@ -64,8 +64,8 @@ struct _GstTCPClientSrc {
   struct sockaddr_in server_sin;
 
   /* socket */
-  int sock_fd;
-  int control_fds[2];
+  GstPollFD sock_fd;
+  GstPoll *fdset;
 
   GstTCPProtocol protocol; /* protocol used for reading data */
   gboolean caps_received;      /* if we have received caps yet */
index a2c782e..bc1001e 100644 (file)
@@ -63,7 +63,7 @@ enum
 static void gst_tcp_server_sink_finalize (GObject * gobject);
 
 static gboolean gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink,
-    GstFDSet * set);
+    GstPoll * set);
 static gboolean gst_tcp_server_sink_init_send (GstMultiFdSink * this);
 static gboolean gst_tcp_server_sink_close (GstMultiFdSink * this);
 static void gst_tcp_server_sink_removed (GstMultiFdSink * sink, int fd);
@@ -184,11 +184,11 @@ gst_tcp_server_sink_removed (GstMultiFdSink * sink, int fd)
 }
 
 static gboolean
-gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, GstFDSet * set)
+gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, GstPoll * set)
 {
   GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink);
 
-  if (gst_fdset_fd_can_read (set, &this->server_sock)) {
+  if (gst_poll_fd_can_read (set, &this->server_sock)) {
     /* handle new client connection on server socket */
     if (!gst_tcp_server_sink_handle_server_read (this)) {
       GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
@@ -270,7 +270,7 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent)
   ret = 1;
   if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_REUSEADDR,
           (void *) &ret, sizeof (ret)) < 0) {
-    gst_tcp_socket_close (&this->server_sock.fd);
+    gst_tcp_socket_close (&this->server_sock);
     GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
         ("Could not setsockopt: %s", g_strerror (errno)));
     return FALSE;
@@ -279,7 +279,7 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent)
   ret = 1;
   if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE,
           (void *) &ret, sizeof (ret)) < 0) {
-    gst_tcp_socket_close (&this->server_sock.fd);
+    gst_tcp_socket_close (&this->server_sock);
     GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
         ("Could not setsockopt: %s", g_strerror (errno)));
     return FALSE;
@@ -297,7 +297,7 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent)
       sizeof (this->server_sin));
 
   if (ret) {
-    gst_tcp_socket_close (&this->server_sock.fd);
+    gst_tcp_socket_close (&this->server_sock);
     switch (errno) {
       default:
         GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
@@ -314,7 +314,7 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent)
   GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
       this->server_sock.fd, TCP_BACKLOG);
   if (listen (this->server_sock.fd, TCP_BACKLOG) == -1) {
-    gst_tcp_socket_close (&this->server_sock.fd);
+    gst_tcp_socket_close (&this->server_sock);
     GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
         ("Could not listen on server socket: %s", g_strerror (errno)));
     return FALSE;
@@ -323,8 +323,8 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent)
       "listened on server socket %d, returning from connection setup",
       this->server_sock.fd);
 
-  gst_fdset_add_fd (parent->fdset, &this->server_sock);
-  gst_fdset_fd_ctl_read (parent->fdset, &this->server_sock, TRUE);
+  gst_poll_add_fd (parent->fdset, &this->server_sock);
+  gst_poll_fd_ctl_read (parent->fdset, &this->server_sock, TRUE);
 
   return TRUE;
 }
@@ -335,7 +335,7 @@ gst_tcp_server_sink_close (GstMultiFdSink * parent)
   GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent);
 
   if (this->server_sock.fd != -1) {
-    gst_fdset_remove_fd (parent->fdset, &this->server_sock);
+    gst_poll_remove_fd (parent->fdset, &this->server_sock);
 
     close (this->server_sock.fd);
     this->server_sock.fd = -1;
index 9984194..ac8846d 100644 (file)
@@ -76,7 +76,7 @@ struct _GstTCPServerSink {
   struct sockaddr_in server_sin;
 
   /* socket */
-  GstFD server_sock;
+  GstPollFD server_sock;
 };
 
 struct _GstTCPServerSinkClass {
index b638f43..329a04b 100644 (file)
 #include <fcntl.h>
 
 
-/* control stuff stolen from fdsrc */
-#define CONTROL_STOP            'S'     /* stop the select call */
-#define CONTROL_SOCKETS(o)      o->control_fds
-#define WRITE_SOCKET(o)         o->control_fds[1]
-#define READ_SOCKET(o)          o->control_fds[0]
-
-#define SEND_COMMAND(o, command)          \
-G_STMT_START {                              \
-  unsigned char c; c = command;             \
-  write (WRITE_SOCKET(o), &c, 1);         \
-} G_STMT_END
-
-#define READ_COMMAND(o, command, res)        \
-G_STMT_START {                                 \
-  res = read(READ_SOCKET(o), &command, 1);   \
-} G_STMT_END
-
-
 GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug);
 #define GST_CAT_DEFAULT tcpserversrc_debug
 
@@ -147,13 +129,10 @@ gst_tcp_server_src_init (GstTCPServerSrc * src, GstTCPServerSrcClass * g_class)
 {
   src->server_port = TCP_DEFAULT_PORT;
   src->host = g_strdup (TCP_DEFAULT_HOST);
-  src->server_sock_fd = -1;
-  src->client_sock_fd = -1;
+  src->server_sock_fd.fd = -1;
+  src->client_sock_fd.fd = -1;
   src->protocol = GST_TCP_PROTOCOL_NONE;
 
-  READ_SOCKET (src) = -1;
-  WRITE_SOCKET (src) = -1;
-
   GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
 }
 
@@ -172,8 +151,6 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 {
   GstTCPServerSrc *src;
   GstFlowReturn ret = GST_FLOW_OK;
-  fd_set testfds;
-  int maxfdp1;
 
   src = GST_TCP_SERVER_SRC (psrc);
 
@@ -181,36 +158,33 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
     goto wrong_state;
 
 restart:
-  /* do a blocking select on the socket */
-  FD_ZERO (&testfds);
-
-  /* always select on cancel socket */
-  FD_SET (READ_SOCKET (src), &testfds);
-
-  if (src->client_sock_fd >= 0) {
+  if (src->client_sock_fd.fd >= 0) {
     /* if we have a client, wait for read */
-    FD_SET (src->client_sock_fd, &testfds);
-    maxfdp1 = MAX (src->client_sock_fd, READ_SOCKET (src)) + 1;
+    gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, FALSE);
+    gst_poll_fd_ctl_read (src->fdset, &src->client_sock_fd, TRUE);
   } else {
     /* else wait on server socket for connections */
-    FD_SET (src->server_sock_fd, &testfds);
-    maxfdp1 = MAX (src->server_sock_fd, READ_SOCKET (src)) + 1;
+    gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, TRUE);
   }
 
   /* no action (0) is an error too in our case */
-  if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0)
-    goto select_error;
-
-  if (FD_ISSET (READ_SOCKET (src), &testfds))
-    goto select_cancelled;
+  if ((ret = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE)) <= 0) {
+    if (ret == -1 && errno == EBUSY)
+      goto select_cancelled;
+    else
+      goto select_error;
+  }
 
   /* if we have no client socket we can accept one now */
-  if (src->client_sock_fd < 0) {
-    if (FD_ISSET (src->server_sock_fd, &testfds)) {
-      if ((src->client_sock_fd =
-              accept (src->server_sock_fd, (struct sockaddr *) &src->client_sin,
+  if (src->client_sock_fd.fd < 0) {
+    if (gst_poll_fd_can_read (src->fdset, &src->server_sock_fd)) {
+      if ((src->client_sock_fd.fd =
+              accept (src->server_sock_fd.fd,
+                  (struct sockaddr *) &src->client_sin,
                   &src->client_sin_len)) == -1)
         goto accept_error;
+
+      gst_poll_add_fd (src->fdset, &src->client_sock_fd);
     }
     /* and restart now to poll the socket. */
     goto restart;
@@ -220,8 +194,8 @@ restart:
 
   switch (src->protocol) {
     case GST_TCP_PROTOCOL_NONE:
-      ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd,
-          READ_SOCKET (src), outbuf);
+      ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd,
+          src->fdset, outbuf);
       break;
 
     case GST_TCP_PROTOCOL_GDP:
@@ -229,8 +203,8 @@ restart:
         GstCaps *caps;
         gchar *string;
 
-        ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd,
-            READ_SOCKET (src), &caps);
+        ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd.fd,
+            src->fdset, &caps);
 
         if (ret == GST_FLOW_WRONG_STATE)
           goto gdp_cancelled;
@@ -246,8 +220,8 @@ restart:
         gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps);
       }
 
-      ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd,
-          READ_SOCKET (src), outbuf);
+      ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd,
+          src->fdset, outbuf);
 
       if (ret == GST_FLOW_OK)
         gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
@@ -369,26 +343,19 @@ gst_tcp_server_src_start (GstBaseSrc * bsrc)
   int ret;
   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
 
-  /* create the control sockets before anything */
-  if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src)) < 0)
-    goto socket_pair;
-
-  fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
-  fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
-
   /* reset caps_received flag */
   src->caps_received = FALSE;
 
   /* create the server listener socket */
-  if ((src->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
+  if ((src->server_sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
     goto socket_error;
 
   GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d",
-      src->server_sock_fd);
+      src->server_sock_fd.fd);
 
   /* make address reusable */
   ret = 1;
-  if (setsockopt (src->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
+  if (setsockopt (src->server_sock_fd.fd, SOL_SOCKET, SO_REUSEADDR, &ret,
           sizeof (int)) < 0)
     goto sock_opt;
 
@@ -408,16 +375,22 @@ gst_tcp_server_src_start (GstBaseSrc * bsrc)
 
   /* bind it */
   GST_DEBUG_OBJECT (src, "binding server socket to address");
-  if ((ret = bind (src->server_sock_fd, (struct sockaddr *) &src->server_sin,
+  if ((ret = bind (src->server_sock_fd.fd, (struct sockaddr *) &src->server_sin,
               sizeof (src->server_sin))) < 0)
     goto bind_error;
 
   GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d",
-      src->server_sock_fd, TCP_BACKLOG);
+      src->server_sock_fd.fd, TCP_BACKLOG);
 
-  if (listen (src->server_sock_fd, TCP_BACKLOG) == -1)
+  if (listen (src->server_sock_fd.fd, TCP_BACKLOG) == -1)
     goto listen_error;
 
+  /* create an fdset to keep track of our file descriptors */
+  if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
+    goto socket_pair;
+
+  gst_poll_add_fd (src->fdset, &src->server_sock_fd);
+
   GST_DEBUG_OBJECT (src, "received client");
 
   GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN);
@@ -425,12 +398,6 @@ gst_tcp_server_src_start (GstBaseSrc * bsrc)
   return TRUE;
 
   /* ERRORS */
-socket_pair:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
-        GST_ERROR_SYSTEM);
-    return FALSE;
-  }
 socket_error:
   {
     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
@@ -440,6 +407,7 @@ sock_opt:
   {
     GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
         ("Could not setsockopt: %s", g_strerror (errno)));
+    gst_tcp_socket_close (&src->server_sock_fd);
     return FALSE;
   }
 host_error:
@@ -465,6 +433,13 @@ listen_error:
         ("Could not listen on server socket: %s", g_strerror (errno)));
     return FALSE;
   }
+socket_pair:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
+        GST_ERROR_SYSTEM);
+    gst_tcp_socket_close (&src->server_sock_fd);
+    return FALSE;
+  }
 }
 
 static gboolean
@@ -472,20 +447,13 @@ gst_tcp_server_src_stop (GstBaseSrc * bsrc)
 {
   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
 
-  if (src->server_sock_fd != -1) {
-    close (src->server_sock_fd);
-    src->server_sock_fd = -1;
-  }
-  if (src->client_sock_fd != -1) {
-    close (src->client_sock_fd);
-    src->client_sock_fd = -1;
-  }
-  GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
+  gst_poll_free (src->fdset);
+  src->fdset = NULL;
 
-  close (READ_SOCKET (src));
-  close (WRITE_SOCKET (src));
-  READ_SOCKET (src) = -1;
-  WRITE_SOCKET (src) = -1;
+  gst_tcp_socket_close (&src->server_sock_fd);
+  gst_tcp_socket_close (&src->client_sock_fd);
+
+  GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
 
   return TRUE;
 }
@@ -496,7 +464,7 @@ gst_tcp_server_src_unlock (GstBaseSrc * bsrc)
 {
   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
 
-  SEND_COMMAND (src, CONTROL_STOP);
+  gst_poll_set_flushing (src->fdset, TRUE);
 
   return TRUE;
 }
index 4e32f47..22c7afe 100644 (file)
@@ -65,14 +65,14 @@ struct _GstTCPServerSrc {
   int server_port;
   gchar *host;
   struct sockaddr_in server_sin;
-  int server_sock_fd;
+  GstPollFD server_sock_fd;
 
   /* client information */
   struct sockaddr_in client_sin;
   socklen_t client_sin_len;
-  int client_sock_fd;
+  GstPollFD client_sock_fd;
 
-  int control_fds[2];
+  GstPoll *fdset;
 
   GstTCPProtocol protocol; /* protocol used for reading data */
   gboolean caps_received;      /* if we have received caps yet */