From 405571a67e3ec5e18600da8e02a7e561a01f17d4 Mon Sep 17 00:00:00 2001 From: Peter Kjellerstedt Date: Thu, 28 Feb 2008 10:54:14 +0000 Subject: [PATCH] gst/tcp/: Removed fdset and stress test, they are now known as GstPoll in core. Original commit message from CVS: Patch by: Peter Kjellerstedt * gst/tcp/Makefile.am: * gst/tcp/fdsetstress.c: * gst/tcp/gstfdset.c: * gst/tcp/gstfdset.h: Removed fdset and stress test, they are now known as GstPoll in core. * gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_class_init), (gst_multi_fd_sink_add_full), (gst_multi_fd_sink_remove), (gst_multi_fd_sink_clear), (gst_multi_fd_sink_remove_client_link), (gst_multi_fd_sink_handle_client_write), (gst_multi_fd_sink_queue_buffer), (gst_multi_fd_sink_handle_clients), (gst_multi_fd_sink_start), (gst_multi_fd_sink_stop): * gst/tcp/gstmultifdsink.h: * gst/tcp/gsttcp.c: (gst_tcp_socket_read), (gst_tcp_socket_close), (gst_tcp_read_buffer), (gst_tcp_gdp_read_buffer), (gst_tcp_gdp_read_caps): * gst/tcp/gsttcp.h: * gst/tcp/gsttcpclientsink.c: (gst_tcp_client_sink_init), (gst_tcp_client_sink_setcaps), (gst_tcp_client_sink_render), (gst_tcp_client_sink_start), (gst_tcp_client_sink_stop): * gst/tcp/gsttcpclientsink.h: * gst/tcp/gsttcpclientsrc.c: (gst_tcp_client_src_init), (gst_tcp_client_src_create), (gst_tcp_client_src_start), (gst_tcp_client_src_stop), (gst_tcp_client_src_unlock): * gst/tcp/gsttcpclientsrc.h: * gst/tcp/gsttcpserversink.c: (gst_tcp_server_sink_handle_wait), (gst_tcp_server_sink_init_send), (gst_tcp_server_sink_close): * gst/tcp/gsttcpserversink.h: * gst/tcp/gsttcpserversrc.c: (gst_tcp_server_src_init), (gst_tcp_server_src_create), (gst_tcp_server_src_start), (gst_tcp_server_src_stop), (gst_tcp_server_src_unlock): * gst/tcp/gsttcpserversrc.h: Port to GstPoll. See #505417. --- ChangeLog | 40 +++ gst/tcp/Makefile.am | 8 - gst/tcp/fdsetstress.c | 179 ------------- gst/tcp/gstfdset.c | 532 ------------------------------------- gst/tcp/gstfdset.h | 68 ----- gst/tcp/gstmultifdsink.c | 124 ++------- gst/tcp/gstmultifdsink.h | 11 +- gst/tcp/gsttcp.c | 66 ++--- gst/tcp/gsttcp.h | 10 +- gst/tcp/gsttcpclientsink.c | 21 +- gst/tcp/gsttcpclientsink.h | 2 +- gst/tcp/gsttcpclientsrc.c | 63 ++--- gst/tcp/gsttcpclientsrc.h | 4 +- gst/tcp/gsttcpserversink.c | 20 +- gst/tcp/gsttcpserversink.h | 2 +- gst/tcp/gsttcpserversrc.c | 136 ++++------ gst/tcp/gsttcpserversrc.h | 6 +- 17 files changed, 196 insertions(+), 1096 deletions(-) delete mode 100644 gst/tcp/fdsetstress.c delete mode 100644 gst/tcp/gstfdset.c delete mode 100644 gst/tcp/gstfdset.h diff --git a/ChangeLog b/ChangeLog index 979d1bc5a6..bf22627dba 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,43 @@ +2008-02-28 Wim Taymans + + Patch by: Peter Kjellerstedt + + * gst/tcp/Makefile.am: + * gst/tcp/fdsetstress.c: + * gst/tcp/gstfdset.c: + * gst/tcp/gstfdset.h: + Removed fdset and stress test, they are now known as GstPoll in + core. + + * gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_class_init), + (gst_multi_fd_sink_add_full), (gst_multi_fd_sink_remove), + (gst_multi_fd_sink_clear), (gst_multi_fd_sink_remove_client_link), + (gst_multi_fd_sink_handle_client_write), + (gst_multi_fd_sink_queue_buffer), + (gst_multi_fd_sink_handle_clients), (gst_multi_fd_sink_start), + (gst_multi_fd_sink_stop): + * gst/tcp/gstmultifdsink.h: + * gst/tcp/gsttcp.c: (gst_tcp_socket_read), (gst_tcp_socket_close), + (gst_tcp_read_buffer), (gst_tcp_gdp_read_buffer), + (gst_tcp_gdp_read_caps): + * gst/tcp/gsttcp.h: + * gst/tcp/gsttcpclientsink.c: (gst_tcp_client_sink_init), + (gst_tcp_client_sink_setcaps), (gst_tcp_client_sink_render), + (gst_tcp_client_sink_start), (gst_tcp_client_sink_stop): + * gst/tcp/gsttcpclientsink.h: + * gst/tcp/gsttcpclientsrc.c: (gst_tcp_client_src_init), + (gst_tcp_client_src_create), (gst_tcp_client_src_start), + (gst_tcp_client_src_stop), (gst_tcp_client_src_unlock): + * gst/tcp/gsttcpclientsrc.h: + * gst/tcp/gsttcpserversink.c: (gst_tcp_server_sink_handle_wait), + (gst_tcp_server_sink_init_send), (gst_tcp_server_sink_close): + * gst/tcp/gsttcpserversink.h: + * gst/tcp/gsttcpserversrc.c: (gst_tcp_server_src_init), + (gst_tcp_server_src_create), (gst_tcp_server_src_start), + (gst_tcp_server_src_stop), (gst_tcp_server_src_unlock): + * gst/tcp/gsttcpserversrc.h: + Port to GstPoll. See #505417. + 2008-02-28 Wim Taymans Patch by: Peter Kjellerstedt diff --git a/gst/tcp/Makefile.am b/gst/tcp/Makefile.am index 02100c0063..09a566ee5c 100644 --- a/gst/tcp/Makefile.am +++ b/gst/tcp/Makefile.am @@ -15,7 +15,6 @@ BUILT_SOURCES = $(built_sources) $(built_headers) libgsttcp_la_SOURCES = \ gsttcpplugin.c \ gsttcp.c \ - gstfdset.c \ gstmultifdsink.c \ gsttcpclientsrc.c gsttcpclientsink.c \ gsttcpserversrc.c gsttcpserversink.c @@ -31,7 +30,6 @@ libgsttcp_la_LIBADD = $(GST_BASE_LIBS) $(GST_GDP_LIBS) $(GST_LIBS) noinst_HEADERS = \ gsttcpplugin.h \ gsttcp.h \ - gstfdset.h \ gstmultifdsink.h \ gsttcpclientsrc.h gsttcpclientsink.h \ gsttcpserversrc.h gsttcpserversink.h @@ -39,9 +37,3 @@ noinst_HEADERS = \ CLEANFILES = $(BUILT_SOURCES) EXTRA_DIST = gsttcp-marshal.list - -noinst_PROGRAMS = fdsetstress - -fdsetstress_SOURCES = fdsetstress.c gstfdset.c -fdsetstress_CFLAGS = $(GST_CFLAGS) -fdsetstress_LDFLAGS = $(GST_LIBS) diff --git a/gst/tcp/fdsetstress.c b/gst/tcp/fdsetstress.c deleted file mode 100644 index 8bc13e3b56..0000000000 --- a/gst/tcp/fdsetstress.c +++ /dev/null @@ -1,179 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen - * Copyright (C) <2004> Thomas Vander Stichele - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -#include -#include - -#include "gstfdset.h" - -static GstFDSet *set; -static GList *fds = NULL; -static GMutex *fdlock; -static GTimer *timer; - -#define MAX_THREADS 100 - -static void -mess_some_more () -{ - GList *walk; - gint random; - gint removed = 0; - - g_mutex_lock (fdlock); - - for (walk = fds; walk;) { - GstFD *fd = (GstFD *) walk->data; - - walk = g_list_next (walk); - - random = (gint) (10.0 * rand () / (RAND_MAX + 1.0)); - switch (random) { - case 0: - { - /* - GstFD *newfd = g_new0 (GstFD, 1); - - gst_fdset_add_fd (set, newfd); - fds = g_list_prepend (fds, newfd); - */ - break; - } - case 1: - if ((gint) (10.0 * rand () / (RAND_MAX + 1.0)) < 2) { - gst_fdset_remove_fd (set, fd); - fds = g_list_remove (fds, fd); - g_free (fd); - removed++; - } - break; - - case 2: - gst_fdset_fd_ctl_write (set, fd, TRUE); - break; - case 3: - gst_fdset_fd_ctl_write (set, fd, FALSE); - break; - case 4: - gst_fdset_fd_ctl_read (set, fd, TRUE); - break; - case 5: - gst_fdset_fd_ctl_read (set, fd, FALSE); - break; - - case 6: - gst_fdset_fd_has_closed (set, fd); - break; - case 7: - gst_fdset_fd_has_error (set, fd); - break; - case 8: - gst_fdset_fd_can_read (set, fd); - break; - case 9: - gst_fdset_fd_can_write (set, fd); - break; - default: - g_assert_not_reached (); - break; - } - } - if (g_list_length (fds) < 900) { - random = removed + (gint) (2.0 * rand () / (RAND_MAX + 1.0)); - while (random) { - GstFD *newfd = g_new0 (GstFD, 1); - - gst_fdset_add_fd (set, newfd); - fds = g_list_prepend (fds, newfd); - random--; - } - } - - g_mutex_unlock (fdlock); -} - -void * -run_test (void *threadid) -{ - gint id = GPOINTER_TO_INT (threadid); - - while (TRUE) { - if (id == 0) { - gint res = gst_fdset_wait (set, 10); - - if (res < 0) { - g_print ("error %d %s\n", errno, g_strerror (errno)); - } - } else { - mess_some_more (); - if (g_timer_elapsed (timer, NULL) > 0.5) { - g_mutex_lock (fdlock); - g_print ("active fds :%d\n", g_list_length (fds)); - g_timer_start (timer); - g_mutex_unlock (fdlock); - } - g_usleep (1); - } - } - - g_thread_exit (NULL); - return NULL; -} - -gint -main (gint argc, gchar * argv[]) -{ - GThread *threads[MAX_THREADS]; - gint num_threads; - gint t; - - gst_init (&argc, &argv); - - fdlock = g_mutex_new (); - timer = g_timer_new (); - - if (argc != 2) { - g_print ("usage: %s \n", argv[0]); - exit (-1); - } - - num_threads = atoi (argv[1]); - - set = gst_fdset_new (GST_FDSET_MODE_POLL); - - for (t = 0; t < num_threads; t++) { - GError *error = NULL; - - threads[t] = g_thread_create (run_test, GINT_TO_POINTER (t), TRUE, &error); - if (error) { - printf ("ERROR: g_thread_create() %s\n", error->message); - exit (-1); - } - } - printf ("main(): Created %d threads.\n", t); - - for (t = 0; t < num_threads; t++) { - g_thread_join (threads[t]); - } - - gst_fdset_free (set); - - return 0; -} diff --git a/gst/tcp/gstfdset.c b/gst/tcp/gstfdset.c deleted file mode 100644 index 5f222381ad..0000000000 --- a/gst/tcp/gstfdset.c +++ /dev/null @@ -1,532 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen - * Copyright (C) <2004> Wim Taymans - * - * gsttcpfdset.h: fdset datastructure - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -#define MIN_POLLFDS 32 -#define INIT_POLLFDS MIN_POLLFDS - -#include -#include -#include -#include -/* OS/X needs this because of bad headers */ -#include - -#include "gstfdset.h" - -GType -gst_fdset_mode_get_type (void) -{ - static GType fdset_mode_type = 0; - static const GEnumValue fdset_mode[] = { - {GST_FDSET_MODE_SELECT, "Select", "select"}, - {GST_FDSET_MODE_POLL, "Poll", "poll"}, - {GST_FDSET_MODE_EPOLL, "EPoll", "epoll"}, - {0, NULL, NULL}, - }; - - if (!fdset_mode_type) { - fdset_mode_type = g_enum_register_static ("GstFDSetMode", fdset_mode); - } - return fdset_mode_type; -} - -struct _GstFDSet -{ - GstFDSetMode mode; - - /* for poll */ - struct pollfd *testpollfds; - gint last_testpollfds; - gint testsize; - - struct pollfd *pollfds; - gint size; - gint free; - gint last_pollfds; - GMutex *poll_lock; - - /* for select */ - fd_set readfds, writefds; /* input */ - fd_set testreadfds, testwritefds; /* output */ -}; - -static gint -nearest_pow (gint num) -{ - /* hacker's delight page 48 */ - num -= 1; - num |= num >> 1; - num |= num >> 2; - num |= num >> 4; - num |= num >> 8; - num |= num >> 16; - - return num + 1; -} - -/* resize a given pollfd array from old_size number of items - * to new_size number of items. Also initializes the new elements - * with the default values. */ -static struct pollfd * -resize (struct pollfd *fds, gint old_size, gint new_size) -{ - struct pollfd *res; - gint i; - - res = g_realloc (fds, new_size * sizeof (struct pollfd)); - for (i = old_size; i < new_size; i++) { - res[i].fd = -1; - res[i].events = 0; - res[i].revents = 0; - } - return res; -} - -static void -ensure_size (GstFDSet * set, gint len) -{ - if (len > set->size) { - len = nearest_pow (len); - len = MAX (len, MIN_POLLFDS); - - set->pollfds = resize (set->pollfds, set->size, len); - set->size = len; - } -} - -GstFDSet * -gst_fdset_new (GstFDSetMode mode) -{ - GstFDSet *nset; - - nset = g_new0 (GstFDSet, 1); - nset->mode = mode; - - switch (mode) { - case GST_FDSET_MODE_SELECT: - FD_ZERO (&nset->readfds); - FD_ZERO (&nset->writefds); - break; - case GST_FDSET_MODE_POLL: - nset->pollfds = NULL; - nset->testpollfds = NULL; - nset->free = 0; - nset->last_pollfds = 0; - nset->poll_lock = g_mutex_new (); - ensure_size (nset, MIN_POLLFDS); - break; - case GST_FDSET_MODE_EPOLL: - g_warning ("implement EPOLL mode in GstFDSet"); - break; - default: - break; - } - return nset; -} - -void -gst_fdset_free (GstFDSet * set) -{ - g_return_if_fail (set != NULL); - - switch (set->mode) { - case GST_FDSET_MODE_SELECT: - break; - case GST_FDSET_MODE_POLL: - g_free (set->testpollfds); - g_free (set->pollfds); - g_mutex_free (set->poll_lock); - break; - case GST_FDSET_MODE_EPOLL: - g_warning ("implement EPOLL mode in GstFDSet"); - break; - default: - break; - } - g_free (set); -} - - -void -gst_fdset_set_mode (GstFDSet * set, GstFDSetMode mode) -{ - g_return_if_fail (set != NULL); - - g_warning ("implement set_mode in GstFDSet"); -} - -GstFDSetMode -gst_fdset_get_mode (GstFDSet * set) -{ - g_return_val_if_fail (set != NULL, FALSE); - - return set->mode; -} - -gboolean -gst_fdset_add_fd (GstFDSet * set, GstFD * fd) -{ - gboolean res = FALSE; - - g_return_val_if_fail (set != NULL, FALSE); - g_return_val_if_fail (fd != NULL, FALSE); - - switch (set->mode) { - case GST_FDSET_MODE_SELECT: - res = TRUE; - break; - case GST_FDSET_MODE_POLL: - { - struct pollfd *nfd; - gint idx; - - g_mutex_lock (set->poll_lock); - - ensure_size (set, set->last_pollfds + 1); - - idx = set->free; - if (idx == -1) { - /* find free space */ - while (idx < set->last_pollfds) { - idx++; - if (set->pollfds[idx].fd == -1) - break; - } - } - nfd = &set->pollfds[idx]; - - nfd->fd = fd->fd; - nfd->events = POLLERR | POLLNVAL | POLLHUP; - nfd->revents = 0; - - /* see if we have one fd more */ - set->last_pollfds = MAX (idx + 1, set->last_pollfds); - fd->idx = idx; - set->free = -1; - g_mutex_unlock (set->poll_lock); - - res = TRUE; - break; - } - case GST_FDSET_MODE_EPOLL: - break; - } - return res; -} - -gboolean -gst_fdset_remove_fd (GstFDSet * set, GstFD * fd) -{ - gboolean res = FALSE; - - g_return_val_if_fail (set != NULL, FALSE); - g_return_val_if_fail (fd != NULL, FALSE); - - switch (set->mode) { - case GST_FDSET_MODE_SELECT: - /* nothing */ - FD_CLR (fd->fd, &set->writefds); - FD_CLR (fd->fd, &set->readfds); - res = TRUE; - break; - case GST_FDSET_MODE_POLL: - { - g_mutex_lock (set->poll_lock); - - /* FIXME on some platforms poll doesn't ignore the fd - * when set to -1 */ - set->pollfds[fd->idx].fd = -1; - set->pollfds[fd->idx].events = 0; - set->pollfds[fd->idx].revents = 0; - - /* if we removed the last fd, we can lower the last_pollfds */ - if (fd->idx + 1 == set->last_pollfds) { - set->last_pollfds--; - } - fd->idx = -1; - - if (set->free == -1) { - set->free = fd->idx; - } else { - set->free = MIN (set->free, fd->idx); - } - g_mutex_unlock (set->poll_lock); - res = TRUE; - break; - } - case GST_FDSET_MODE_EPOLL: - break; - } - return res; -} - -void -gst_fdset_fd_ctl_write (GstFDSet * set, GstFD * fd, gboolean active) -{ - g_return_if_fail (set != NULL); - g_return_if_fail (fd != NULL); - - switch (set->mode) { - case GST_FDSET_MODE_SELECT: - if (active) - FD_SET (fd->fd, &set->writefds); - else - FD_CLR (fd->fd, &set->writefds); - break; - case GST_FDSET_MODE_POLL: - { - gint idx; - - g_mutex_lock (set->poll_lock); - - idx = fd->idx; - if (idx >= 0) { - gint events = set->pollfds[idx].events; - - if (active) - events |= POLLOUT; - else - events &= ~POLLOUT; - - set->pollfds[idx].events = events; - } - g_mutex_unlock (set->poll_lock); - break; - } - case GST_FDSET_MODE_EPOLL: - break; - } -} - -void -gst_fdset_fd_ctl_read (GstFDSet * set, GstFD * fd, gboolean active) -{ - g_return_if_fail (set != NULL); - g_return_if_fail (fd != NULL); - - switch (set->mode) { - case GST_FDSET_MODE_SELECT: - if (active) - FD_SET (fd->fd, &set->readfds); - else - FD_CLR (fd->fd, &set->readfds); - break; - case GST_FDSET_MODE_POLL: - { - gint idx; - - g_mutex_lock (set->poll_lock); - - idx = fd->idx; - if (idx >= 0) { - gint events = set->pollfds[idx].events; - - if (active) - events |= (POLLIN | POLLPRI); - else - events &= ~(POLLIN | POLLPRI); - - set->pollfds[idx].events = events; - } - g_mutex_unlock (set->poll_lock); - break; - } - case GST_FDSET_MODE_EPOLL: - break; - } -} - -gboolean -gst_fdset_fd_has_closed (GstFDSet * set, GstFD * fd) -{ - gboolean res = FALSE; - - g_return_val_if_fail (set != NULL, FALSE); - g_return_val_if_fail (fd != NULL, FALSE); - - switch (set->mode) { - case GST_FDSET_MODE_SELECT: - res = FALSE; - break; - case GST_FDSET_MODE_POLL: - { - gint idx; - - g_mutex_lock (set->poll_lock); - idx = fd->idx; - - if (idx >= 0 && idx < set->last_testpollfds) { - res = (set->testpollfds[idx].revents & POLLHUP) != 0; - } - g_mutex_unlock (set->poll_lock); - break; - } - case GST_FDSET_MODE_EPOLL: - break; - } - return res; -} - -gboolean -gst_fdset_fd_has_error (GstFDSet * set, GstFD * fd) -{ - gboolean res = FALSE; - - g_return_val_if_fail (set != NULL, FALSE); - g_return_val_if_fail (fd != NULL, FALSE); - - switch (set->mode) { - case GST_FDSET_MODE_SELECT: - res = FALSE; - break; - case GST_FDSET_MODE_POLL: - { - gint idx; - - g_mutex_lock (set->poll_lock); - idx = fd->idx; - - if (idx >= 0 && idx < set->last_testpollfds) { - res = (set->testpollfds[idx].revents & (POLLERR | POLLNVAL)) != 0; - } - g_mutex_unlock (set->poll_lock); - break; - } - case GST_FDSET_MODE_EPOLL: - break; - } - return res; -} - -gboolean -gst_fdset_fd_can_read (GstFDSet * set, GstFD * fd) -{ - gboolean res = FALSE; - - g_return_val_if_fail (set != NULL, FALSE); - g_return_val_if_fail (fd != NULL, FALSE); - - switch (set->mode) { - case GST_FDSET_MODE_SELECT: - res = FD_ISSET (fd->fd, &set->testreadfds); - break; - case GST_FDSET_MODE_POLL: - { - gint idx; - - g_mutex_lock (set->poll_lock); - idx = fd->idx; - - if (idx >= 0 && idx < set->last_testpollfds) { - res = (set->testpollfds[idx].revents & (POLLIN | POLLPRI)) != 0; - } - g_mutex_unlock (set->poll_lock); - break; - } - case GST_FDSET_MODE_EPOLL: - break; - } - return res; -} - -gboolean -gst_fdset_fd_can_write (GstFDSet * set, GstFD * fd) -{ - gboolean res = FALSE; - - g_return_val_if_fail (set != NULL, FALSE); - g_return_val_if_fail (fd != NULL, FALSE); - - switch (set->mode) { - case GST_FDSET_MODE_SELECT: - res = FD_ISSET (fd->fd, &set->testwritefds); - break; - case GST_FDSET_MODE_POLL: - { - gint idx; - - g_mutex_lock (set->poll_lock); - idx = fd->idx; - - if (idx >= 0 && idx < set->last_testpollfds) { - res = (set->testpollfds[idx].revents & POLLOUT) != 0; - } - g_mutex_unlock (set->poll_lock); - break; - } - case GST_FDSET_MODE_EPOLL: - break; - } - return res; -} - -gint -gst_fdset_wait (GstFDSet * set, int timeout) -{ - int res = -1; - - g_return_val_if_fail (set != NULL, -1); - - switch (set->mode) { - case GST_FDSET_MODE_SELECT: - { - struct timeval tv; - struct timeval *tvptr; - - set->testreadfds = set->readfds; - set->testwritefds = set->writefds; - - if (timeout > 0) { - tv.tv_sec = timeout / 1000; - tv.tv_usec = timeout % 1000; - - tvptr = &tv; - } else { - tvptr = NULL; - } - res = - select (FD_SETSIZE, &set->testreadfds, &set->testwritefds, - (fd_set *) 0, tvptr); - break; - } - case GST_FDSET_MODE_POLL: - { - g_mutex_lock (set->poll_lock); - if (set->testsize != set->size) { - set->testpollfds = resize (set->testpollfds, set->testsize, set->size); - set->testsize = set->size; - } - set->last_testpollfds = set->last_pollfds; - memcpy (set->testpollfds, set->pollfds, - sizeof (struct pollfd) * set->last_testpollfds); - g_mutex_unlock (set->poll_lock); - - res = poll (set->testpollfds, set->last_testpollfds, timeout); - - break; - } - case GST_FDSET_MODE_EPOLL: - break; - } - - return res; -} diff --git a/gst/tcp/gstfdset.h b/gst/tcp/gstfdset.h deleted file mode 100644 index 1f9349066e..0000000000 --- a/gst/tcp/gstfdset.h +++ /dev/null @@ -1,68 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen - * Copyright (C) <2004> Wim Taymans - * - * gstfdset.h: fdset datastructure - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -#ifndef __GST_FDSET_H__ -#define __GST_FDSET_H__ - -#include - -G_BEGIN_DECLS - -typedef struct _GstFDSet GstFDSet; - -typedef struct { - int fd; - gint idx; -} GstFD; - -typedef enum { - GST_FDSET_MODE_SELECT, - GST_FDSET_MODE_POLL, - GST_FDSET_MODE_EPOLL -} GstFDSetMode; - -#define GST_TYPE_FDSET_MODE (gst_fdset_mode_get_type()) -GType gst_fdset_mode_get_type (void); - - -GstFDSet* gst_fdset_new (GstFDSetMode mode); -void gst_fdset_free (GstFDSet *set); - -void gst_fdset_set_mode (GstFDSet *set, GstFDSetMode mode); -GstFDSetMode gst_fdset_get_mode (GstFDSet *set); - -gboolean gst_fdset_add_fd (GstFDSet *set, GstFD *fd); -gboolean gst_fdset_remove_fd (GstFDSet *set, GstFD *fd); - -void gst_fdset_fd_ctl_write (GstFDSet *set, GstFD *fd, gboolean active); -void gst_fdset_fd_ctl_read (GstFDSet *set, GstFD *fd, gboolean active); - -gboolean gst_fdset_fd_has_closed (GstFDSet *set, GstFD *fd); -gboolean gst_fdset_fd_has_error (GstFDSet *set, GstFD *fd); -gboolean gst_fdset_fd_can_read (GstFDSet *set, GstFD *fd); -gboolean gst_fdset_fd_can_write (GstFDSet *set, GstFD *fd); - -gint gst_fdset_wait (GstFDSet *set, int timeout); - -G_END_DECLS - -#endif /* __GST_FDSET_H__ */ diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index d115d806f5..6f6a092163 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -132,25 +132,6 @@ #define NOT_IMPLEMENTED 0 -/* the select call is also performed on the control sockets, that way - * we can send special commands to unblock or restart the select call */ -#define CONTROL_RESTART 'R' /* restart the select call */ -#define CONTROL_STOP 'S' /* stop the select call */ -#define CONTROL_SOCKETS(sink) sink->control_sock -#define WRITE_SOCKET(sink) sink->control_sock[1] -#define READ_SOCKET(sink) sink->control_sock[0] - -#define SEND_COMMAND(sink, command) \ -G_STMT_START { \ - unsigned char c; c = command; \ - write (WRITE_SOCKET(sink).fd, &c, 1); \ -} G_STMT_END - -#define READ_COMMAND(sink, command, res) \ -G_STMT_START { \ - res = read(READ_SOCKET(sink).fd, &command, 1);\ -} G_STMT_END - /* elementfactory information */ static const GstElementDetails gst_multi_fd_sink_details = GST_ELEMENT_DETAILS ("Multi filedescriptor sink", @@ -188,7 +169,7 @@ enum /* this is really arbitrarily chosen */ #define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_NONE -#define DEFAULT_MODE GST_FDSET_MODE_POLL +#define DEFAULT_MODE GST_POLL_MODE_AUTO #define DEFAULT_BUFFERS_MAX -1 #define DEFAULT_BUFFERS_SOFT_MAX -1 #define DEFAULT_TIME_MIN -1 @@ -380,7 +361,7 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL, G_PARAM_READWRITE)); g_object_class_install_property (gobject_class, PROP_MODE, g_param_spec_enum ("mode", "Mode", - "The mode for selecting activity on the fds", GST_TYPE_FDSET_MODE, + "The mode for selecting activity on the fds", GST_TYPE_POLL_MODE, DEFAULT_MODE, G_PARAM_READWRITE)); g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX, @@ -733,12 +714,12 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, /* set the socket to non blocking */ res = fcntl (fd, F_SETFL, O_NONBLOCK); /* we always read from a client */ - gst_fdset_add_fd (sink->fdset, &client->fd); + gst_poll_add_fd (sink->fdset, &client->fd); /* we don't try to read from write only fds */ flags = fcntl (fd, F_GETFL, 0); if ((flags & O_ACCMODE) != O_WRONLY) { - gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE); + gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE); } /* figure out the mode, can't use send() for non sockets */ res = fstat (fd, &statbuf); @@ -746,7 +727,7 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, client->is_socket = TRUE; } - SEND_COMMAND (sink, CONTROL_RESTART); + gst_poll_restart (sink->fdset); CLIENTS_UNLOCK (sink); @@ -807,7 +788,7 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd) client->status = GST_CLIENT_STATUS_REMOVED; gst_multi_fd_sink_remove_client_link (sink, clink); - SEND_COMMAND (sink, CONTROL_RESTART); + gst_poll_restart (sink->fdset); } else { GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd); } @@ -877,7 +858,7 @@ restart: client->status = GST_CLIENT_STATUS_REMOVED; gst_multi_fd_sink_remove_client_link (sink, clients); } - SEND_COMMAND (sink, CONTROL_RESTART); + gst_poll_restart (sink->fdset); CLIENTS_UNLOCK (sink); } @@ -1010,7 +991,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link) break; } - gst_fdset_remove_fd (sink->fdset, &client->fd); + gst_poll_remove_fd (sink->fdset, &client->fd); g_get_current_time (&now); client->disconnect_time = GST_TIMEVAL_TO_TIME (now); @@ -1877,7 +1858,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, if (client->bufpos == -1) { /* client is too fast, remove from write queue until new buffer is * available */ - gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE); + gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE); /* if we flushed out all of the client buffers, we can stop */ if (client->flushcount == 0) goto flushed; @@ -1898,7 +1879,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, client->bufpos = position; } else { /* cannot send data to this client yet */ - gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE); + gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE); return TRUE; } } @@ -2164,7 +2145,7 @@ restart: } else if (client->bufpos == 0 || client->new_connection) { /* can send data to this client now. need to signal the select thread that * the fd_set changed */ - gst_fdset_fd_ctl_write (sink->fdset, &client->fd, TRUE); + gst_poll_fd_ctl_write (sink->fdset, &client->fd, TRUE); need_signal = TRUE; } /* keep track of maximum buffer usage */ @@ -2241,7 +2222,7 @@ restart: /* and send a signal to thread if fd_set changed */ if (need_signal) { - SEND_COMMAND (sink, CONTROL_RESTART); + gst_poll_restart (sink->fdset); } } @@ -2265,8 +2246,6 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) fclass = GST_MULTI_FD_SINK_GET_CLASS (sink); do { - gboolean stop = FALSE; - try_again = FALSE; /* check for: @@ -2274,7 +2253,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) * - client socket input (ie, clients saying goodbye) * - client socket output (ie, client reads) */ GST_LOG_OBJECT (sink, "waiting on action on fdset"); - result = gst_fdset_wait (sink->fdset, -1); + result = gst_poll_wait (sink->fdset, GST_CLOCK_TIME_NONE); /* < 0 is an error, 0 just means a timeout happened, which is impossible */ if (result < 0) { @@ -2317,8 +2296,11 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) * are not valid */ try_again = TRUE; } else if (errno == EINTR) { - /* interrupted system call, just redo the select */ + /* interrupted system call, just redo the wait */ try_again = TRUE; + } else if (errno == EBUSY) { + /* the call to gst_poll_wait() was flushed */ + return; } else { /* this is quite bad... */ GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL), @@ -2327,46 +2309,6 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) } } else { GST_LOG_OBJECT (sink, "wait done: %d sockets with events", result); - /* read all commands */ - if (gst_fdset_fd_can_read (sink->fdset, &READ_SOCKET (sink))) { - GST_LOG_OBJECT (sink, "have a command"); - while (TRUE) { - gchar command; - int res; - - READ_COMMAND (sink, command, res); - if (res <= 0) { - GST_LOG_OBJECT (sink, "no more commands"); - /* no more commands */ - break; - } - - switch (command) { - case CONTROL_RESTART: - GST_LOG_OBJECT (sink, "restart"); - /* need to restart the select call as the fd_set changed */ - /* if other file descriptors than the READ_SOCKET had activity, - * we don't restart just yet, but handle the other clients first - */ - if (result == 1) - try_again = TRUE; - break; - case CONTROL_STOP: - /* break out of the select loop */ - GST_LOG_OBJECT (sink, "stop"); - /* stop this function */ - stop = TRUE; - break; - default: - GST_WARNING_OBJECT (sink, "unkown"); - g_warning ("multifdsink: unknown control message received"); - break; - } - } - } - } - if (stop) { - return; } } while (try_again); @@ -2396,25 +2338,25 @@ restart2: continue; } - if (gst_fdset_fd_has_closed (sink->fdset, &client->fd)) { + if (gst_poll_fd_has_closed (sink->fdset, &client->fd)) { client->status = GST_CLIENT_STATUS_CLOSED; gst_multi_fd_sink_remove_client_link (sink, clients); continue; } - if (gst_fdset_fd_has_error (sink->fdset, &client->fd)) { - GST_WARNING_OBJECT (sink, "gst_fdset_fd_has_error for %d", client->fd.fd); + if (gst_poll_fd_has_error (sink->fdset, &client->fd)) { + GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->fd.fd); client->status = GST_CLIENT_STATUS_ERROR; gst_multi_fd_sink_remove_client_link (sink, clients); continue; } - if (gst_fdset_fd_can_read (sink->fdset, &client->fd)) { + if (gst_poll_fd_can_read (sink->fdset, &client->fd)) { /* handle client read */ if (!gst_multi_fd_sink_handle_client_read (sink, client)) { gst_multi_fd_sink_remove_client_link (sink, clients); continue; } } - if (gst_fdset_fd_can_write (sink->fdset, &client->fd)) { + if (gst_poll_fd_can_write (sink->fdset, &client->fd)) { /* handle client write */ if (!gst_multi_fd_sink_handle_client_write (sink, client)) { gst_multi_fd_sink_remove_client_link (sink, clients); @@ -2679,7 +2621,6 @@ static gboolean gst_multi_fd_sink_start (GstBaseSink * bsink) { GstMultiFdSinkClass *fclass; - int control_socket[2]; GstMultiFdSink *this; if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN)) @@ -2689,20 +2630,9 @@ gst_multi_fd_sink_start (GstBaseSink * bsink) fclass = GST_MULTI_FD_SINK_GET_CLASS (this); GST_INFO_OBJECT (this, "starting in mode %d", this->mode); - this->fdset = gst_fdset_new (this->mode); - - if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0) + if ((this->fdset = gst_poll_new (this->mode, TRUE)) == NULL) goto socket_pair; - READ_SOCKET (this).fd = control_socket[0]; - WRITE_SOCKET (this).fd = control_socket[1]; - - gst_fdset_add_fd (this->fdset, &READ_SOCKET (this)); - gst_fdset_fd_ctl_read (this->fdset, &READ_SOCKET (this), TRUE); - - fcntl (READ_SOCKET (this).fd, F_SETFL, O_NONBLOCK); - fcntl (WRITE_SOCKET (this).fd, F_SETFL, O_NONBLOCK); - this->streamheader = NULL; this->bytes_to_serve = 0; this->bytes_served = 0; @@ -2750,7 +2680,7 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink) this->running = FALSE; - SEND_COMMAND (this, CONTROL_STOP); + gst_poll_set_flushing (this->fdset, TRUE); if (this->thread) { GST_DEBUG_OBJECT (this, "joining thread"); g_thread_join (this->thread); @@ -2761,9 +2691,6 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink) /* free the clients */ gst_multi_fd_sink_clear (this); - close (READ_SOCKET (this).fd); - close (WRITE_SOCKET (this).fd); - if (this->streamheader) { g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL); g_slist_free (this->streamheader); @@ -2774,8 +2701,7 @@ gst_multi_fd_sink_stop (GstBaseSink * bsink) fclass->close (this); if (this->fdset) { - gst_fdset_remove_fd (this->fdset, &READ_SOCKET (this)); - gst_fdset_free (this->fdset); + gst_poll_free (this->fdset); this->fdset = NULL; } g_hash_table_foreach_remove (this->fd_hash, multifdsink_hash_remove, this); diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index baf70d713f..a4f557eafe 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -28,7 +28,6 @@ G_BEGIN_DECLS #include "gsttcp.h" -#include "gstfdset.h" #define GST_TYPE_MULTI_FD_SINK \ (gst_multi_fd_sink_get_type()) @@ -140,7 +139,7 @@ typedef enum /* structure for a client */ typedef struct { - GstFD fd; + GstPollFD fd; gint bufpos; /* position of this client in the global queue */ gint flushcount; /* the remaining number of buffers to flush out or -1 if the @@ -201,10 +200,8 @@ struct _GstMultiFdSink { GHashTable *fd_hash; /* index on fd to client */ guint clients_cookie; /* Cookie to detect changes to the clients list */ - GstFDSetMode mode; - GstFDSet *fdset; - - GstFD control_sock[2];/* sockets for controlling the select call */ + GstPollMode mode; + GstPoll *fdset; GSList *streamheader; /* GSList of GstBuffers to use as streamheader */ gboolean previous_buffer_in_caps; @@ -259,7 +256,7 @@ struct _GstMultiFdSinkClass { /* vtable */ gboolean (*init) (GstMultiFdSink *sink); - gboolean (*wait) (GstMultiFdSink *sink, GstFDSet *set); + gboolean (*wait) (GstMultiFdSink *sink, GstPoll *set); gboolean (*close) (GstMultiFdSink *sink); void (*removed) (GstMultiFdSink *sink, int fd); diff --git a/gst/tcp/gsttcp.c b/gst/tcp/gsttcp.c index 1ee91fb646..584a25ad3b 100644 --- a/gst/tcp/gsttcp.c +++ b/gst/tcp/gsttcp.c @@ -29,7 +29,6 @@ #include #include #include -#include /* memset, in FD_ZERO macro */ #include #include @@ -128,30 +127,24 @@ gst_tcp_socket_write (int socket, const void *buf, size_t count) */ static GstFlowReturn gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count, - int cancel_fd) + GstPoll * fdset) { - fd_set testfds; - int maxfdp1; ssize_t n; size_t bytes_read; int num_to_read; + int ret; bytes_read = 0; while (bytes_read < count) { /* do a blocking select on the socket */ - FD_ZERO (&testfds); - FD_SET (socket, &testfds); - if (cancel_fd >= 0) - FD_SET (cancel_fd, &testfds); - maxfdp1 = MAX (socket, cancel_fd) + 1; - /* no action (0) is an error too in our case */ - if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0) - goto select_error; - - if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds)) - goto cancelled; + if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) { + if (ret == -1 && errno == EBUSY) + goto cancelled; + else + goto select_error; + } /* ask how much is available for reading on the socket */ if (ioctl (socket, FIONREAD, &num_to_read) < 0) @@ -216,10 +209,12 @@ short_read: /* close the socket and reset the fd. Used to clean up after errors. */ void -gst_tcp_socket_close (int *socket) +gst_tcp_socket_close (GstPollFD * socket) { - close (*socket); - *socket = -1; + if (socket->fd >= 0) { + close (socket->fd); + socket->fd = -1; + } } /* read a buffer from the given socket @@ -229,30 +224,23 @@ gst_tcp_socket_close (int *socket) * EOS */ GstFlowReturn -gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd, +gst_tcp_read_buffer (GstElement * this, int socket, GstPoll * fdset, GstBuffer ** buf) { - fd_set testfds; int ret; - int maxfdp1; ssize_t bytes_read; int readsize; *buf = NULL; /* do a blocking select on the socket */ - FD_ZERO (&testfds); - FD_SET (socket, &testfds); - if (cancel_fd >= 0) - FD_SET (cancel_fd, &testfds); - maxfdp1 = MAX (socket, cancel_fd) + 1; - /* no action (0) is an error too in our case */ - if ((ret = select (maxfdp1, &testfds, NULL, NULL, 0)) <= 0) - goto select_error; - - if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds)) - goto cancelled; + if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) { + if (ret == -1 && errno == EBUSY) + goto cancelled; + else + goto select_error; + } /* ask how much is available for reading on the socket */ if ((ret = ioctl (socket, FIONREAD, &readsize)) < 0) @@ -326,7 +314,7 @@ short_read: * EOS */ GstFlowReturn -gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd, +gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset, GstBuffer ** buf) { GstFlowReturn ret; @@ -338,8 +326,7 @@ gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd, *buf = NULL; header = g_malloc (GST_DP_HEADER_LENGTH); - ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, - cancel_fd); + ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset); if (ret != GST_FLOW_OK) goto header_read_error; @@ -357,7 +344,7 @@ gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd, g_free (header); ret = gst_tcp_socket_read (this, socket, GST_BUFFER_DATA (*buf), - GST_BUFFER_SIZE (*buf), cancel_fd); + GST_BUFFER_SIZE (*buf), fdset); if (ret != GST_FLOW_OK) goto data_read_error; @@ -394,7 +381,7 @@ data_read_error: } GstFlowReturn -gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd, +gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset, GstCaps ** caps) { GstFlowReturn ret; @@ -408,8 +395,7 @@ gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd, *caps = NULL; header = g_malloc (GST_DP_HEADER_LENGTH); - ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, - cancel_fd); + ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset); if (ret != GST_FLOW_OK) goto header_read_error; @@ -429,7 +415,7 @@ gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd, "Reading %" G_GSIZE_FORMAT " bytes for caps packet payload", payload_length); - ret = gst_tcp_socket_read (this, socket, payload, payload_length, cancel_fd); + ret = gst_tcp_socket_read (this, socket, payload, payload_length, fdset); if (ret != GST_FLOW_OK) goto payload_read_error; diff --git a/gst/tcp/gsttcp.h b/gst/tcp/gsttcp.h index 2c15de3fd5..9a89667342 100644 --- a/gst/tcp/gsttcp.h +++ b/gst/tcp/gsttcp.h @@ -44,14 +44,14 @@ gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host); gint gst_tcp_socket_write (int socket, const void *buf, size_t count); -void gst_tcp_socket_close (int *socket); +void gst_tcp_socket_close (GstPollFD *socket); -GstFlowReturn gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf); +GstFlowReturn gst_tcp_read_buffer (GstElement * this, int socket, GstPoll * fdset, GstBuffer **buf); -GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf); -GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd, GstCaps **caps); +GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset, GstBuffer **buf); +GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset, GstCaps **caps); -GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, int cancel_fd); +GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, GstPoll * fdset); gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port); gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port); diff --git a/gst/tcp/gsttcpclientsink.c b/gst/tcp/gsttcpclientsink.c index 32a95c4272..8dc1d657df 100644 --- a/gst/tcp/gsttcpclientsink.c +++ b/gst/tcp/gsttcpclientsink.c @@ -160,7 +160,7 @@ gst_tcp_client_sink_init (GstTCPClientSink * this) this->host = g_strdup (TCP_DEFAULT_HOST); this->port = TCP_DEFAULT_PORT; - this->sock_fd = -1; + this->sock_fd.fd = -1; this->protocol = GST_TCP_PROTOCOL_NONE; GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN); } @@ -198,8 +198,8 @@ gst_tcp_client_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) GST_DEBUG_OBJECT (sink, "Sending caps %s through GDP", string); g_free (string); - if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps, - TRUE, sink->host, sink->port)) + if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd.fd, + caps, TRUE, sink->host, sink->port)) goto gdp_write_error; sink->caps_sent = TRUE; @@ -241,7 +241,7 @@ gst_tcp_client_sink_render (GstBaseSink * bsink, GstBuffer * buf) break; case GST_TCP_PROTOCOL_GDP: GST_LOG_OBJECT (sink, "Sending buffer header through GDP"); - if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd, buf, + if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd.fd, buf, TRUE, sink->host, sink->port)) goto gdp_write_error; break; @@ -250,7 +250,7 @@ gst_tcp_client_sink_render (GstBaseSink * bsink, GstBuffer * buf) } /* write buffer data */ - wrote = gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf), size); + wrote = gst_tcp_socket_write (sink->sock_fd.fd, GST_BUFFER_DATA (buf), size); if (wrote < size) goto write_error; @@ -348,12 +348,12 @@ gst_tcp_client_sink_start (GstTCPClientSink * this) /* create sending client socket */ GST_DEBUG_OBJECT (this, "opening sending client socket to %s:%d", this->host, this->port); - if ((this->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) { + if ((this->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) { GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM); return FALSE; } GST_DEBUG_OBJECT (this, "opened sending client socket with fd %d", - this->sock_fd); + this->sock_fd.fd); /* look up name if we need to */ ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host); @@ -371,7 +371,7 @@ gst_tcp_client_sink_start (GstTCPClientSink * this) g_free (ip); GST_DEBUG_OBJECT (this, "connecting to server"); - ret = connect (this->sock_fd, (struct sockaddr *) &this->server_sin, + ret = connect (this->sock_fd.fd, (struct sockaddr *) &this->server_sin, sizeof (this->server_sin)); if (ret) { @@ -405,10 +405,7 @@ gst_tcp_client_sink_stop (GstTCPClientSink * this) if (!GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN)) return TRUE; - if (this->sock_fd != -1) { - close (this->sock_fd); - this->sock_fd = -1; - } + gst_tcp_socket_close (&this->sock_fd); GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN); diff --git a/gst/tcp/gsttcpclientsink.h b/gst/tcp/gsttcpclientsink.h index 873e2c6244..439987045c 100644 --- a/gst/tcp/gsttcpclientsink.h +++ b/gst/tcp/gsttcpclientsink.h @@ -73,7 +73,7 @@ struct _GstTCPClientSink { struct sockaddr_in server_sin; /* socket */ - int sock_fd; + GstPollFD sock_fd; size_t data_written; /* how much bytes have we written ? */ GstTCPProtocol protocol; /* used with the protocol enum */ diff --git a/gst/tcp/gsttcpclientsrc.c b/gst/tcp/gsttcpclientsrc.c index 87caea5864..89a3a24d8b 100644 --- a/gst/tcp/gsttcpclientsrc.c +++ b/gst/tcp/gsttcpclientsrc.c @@ -32,24 +32,6 @@ #include -/* control stuff stolen from fdsrc */ -#define CONTROL_STOP 'S' /* stop the select call */ -#define CONTROL_SOCKETS(o) o->control_fds -#define WRITE_SOCKET(o) o->control_fds[1] -#define READ_SOCKET(o) o->control_fds[0] - -#define SEND_COMMAND(o, command) \ -G_STMT_START { \ - unsigned char c; c = command; \ - write (WRITE_SOCKET(o), &c, 1); \ -} G_STMT_END - -#define READ_COMMAND(o, command, res) \ -G_STMT_START { \ - res = read(READ_SOCKET(o), &command, 1); \ -} G_STMT_END - - GST_DEBUG_CATEGORY_STATIC (tcpclientsrc_debug); #define GST_CAT_DEFAULT tcpclientsrc_debug @@ -150,13 +132,10 @@ gst_tcp_client_src_init (GstTCPClientSrc * this, GstTCPClientSrcClass * g_class) { this->port = TCP_DEFAULT_PORT; this->host = g_strdup (TCP_DEFAULT_HOST); - this->sock_fd = -1; + this->sock_fd.fd = -1; this->protocol = GST_TCP_PROTOCOL_NONE; this->caps = NULL; - READ_SOCKET (this) = -1; - WRITE_SOCKET (this) = -1; - gst_base_src_set_live (GST_BASE_SRC (this), TRUE); GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SRC_OPEN); @@ -207,8 +186,8 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) /* read the buffer header if we're using a protocol */ switch (src->protocol) { case GST_TCP_PROTOCOL_NONE: - ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd, - READ_SOCKET (src), outbuf); + ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd, + src->fdset, outbuf); break; case GST_TCP_PROTOCOL_GDP: @@ -217,8 +196,8 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) GstCaps *caps; GST_DEBUG_OBJECT (src, "getting caps through GDP"); - ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd, - READ_SOCKET (src), &caps); + ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd.fd, + src->fdset, &caps); if (ret != GST_FLOW_OK) goto no_caps; @@ -227,8 +206,8 @@ gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) src->caps = caps; } - ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd, - READ_SOCKET (src), outbuf); + ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd, + src->fdset, outbuf); break; default: /* need to assert as buf == NULL */ @@ -323,22 +302,18 @@ gst_tcp_client_src_start (GstBaseSrc * bsrc) gchar *ip; GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc); - /* create the control sockets before anything */ - if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src)) < 0) + if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL) goto socket_pair; - fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK); - fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK); - /* create receiving client socket */ GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d", src->host, src->port); - if ((src->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) + if ((src->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) goto no_socket; GST_DEBUG_OBJECT (src, "opened receiving client socket with fd %d", - src->sock_fd); + src->sock_fd.fd); GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN); /* look up name if we need to */ @@ -355,7 +330,7 @@ gst_tcp_client_src_start (GstBaseSrc * bsrc) g_free (ip); GST_DEBUG_OBJECT (src, "connecting to server"); - ret = connect (src->sock_fd, (struct sockaddr *) &src->server_sin, + ret = connect (src->sock_fd.fd, (struct sockaddr *) &src->server_sin, sizeof (src->server_sin)); if (ret) { @@ -406,10 +381,13 @@ gst_tcp_client_src_stop (GstBaseSrc * bsrc) src = GST_TCP_CLIENT_SRC (bsrc); GST_DEBUG_OBJECT (src, "closing socket"); - if (src->sock_fd != -1) { - close (src->sock_fd); - src->sock_fd = -1; + + if (src->fdset != NULL) { + gst_poll_free (src->fdset); + src->fdset = NULL; } + + gst_tcp_socket_close (&src->sock_fd); src->caps_received = FALSE; if (src->caps) { gst_caps_unref (src->caps); @@ -417,11 +395,6 @@ gst_tcp_client_src_stop (GstBaseSrc * bsrc) } GST_OBJECT_FLAG_UNSET (src, GST_TCP_CLIENT_SRC_OPEN); - close (READ_SOCKET (src)); - close (WRITE_SOCKET (src)); - READ_SOCKET (src) = -1; - WRITE_SOCKET (src) = -1; - return TRUE; } @@ -431,7 +404,7 @@ gst_tcp_client_src_unlock (GstBaseSrc * bsrc) { GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc); - SEND_COMMAND (src, CONTROL_STOP); + gst_poll_set_flushing (src->fdset, TRUE); return TRUE; } diff --git a/gst/tcp/gsttcpclientsrc.h b/gst/tcp/gsttcpclientsrc.h index d44054c661..24d31e8a82 100644 --- a/gst/tcp/gsttcpclientsrc.h +++ b/gst/tcp/gsttcpclientsrc.h @@ -64,8 +64,8 @@ struct _GstTCPClientSrc { struct sockaddr_in server_sin; /* socket */ - int sock_fd; - int control_fds[2]; + GstPollFD sock_fd; + GstPoll *fdset; GstTCPProtocol protocol; /* protocol used for reading data */ gboolean caps_received; /* if we have received caps yet */ diff --git a/gst/tcp/gsttcpserversink.c b/gst/tcp/gsttcpserversink.c index a2c782ee74..bc1001e476 100644 --- a/gst/tcp/gsttcpserversink.c +++ b/gst/tcp/gsttcpserversink.c @@ -63,7 +63,7 @@ enum static void gst_tcp_server_sink_finalize (GObject * gobject); static gboolean gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, - GstFDSet * set); + GstPoll * set); static gboolean gst_tcp_server_sink_init_send (GstMultiFdSink * this); static gboolean gst_tcp_server_sink_close (GstMultiFdSink * this); static void gst_tcp_server_sink_removed (GstMultiFdSink * sink, int fd); @@ -184,11 +184,11 @@ gst_tcp_server_sink_removed (GstMultiFdSink * sink, int fd) } static gboolean -gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, GstFDSet * set) +gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, GstPoll * set) { GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink); - if (gst_fdset_fd_can_read (set, &this->server_sock)) { + if (gst_poll_fd_can_read (set, &this->server_sock)) { /* handle new client connection on server socket */ if (!gst_tcp_server_sink_handle_server_read (this)) { GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), @@ -270,7 +270,7 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent) ret = 1; if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_REUSEADDR, (void *) &ret, sizeof (ret)) < 0) { - gst_tcp_socket_close (&this->server_sock.fd); + gst_tcp_socket_close (&this->server_sock); GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL), ("Could not setsockopt: %s", g_strerror (errno))); return FALSE; @@ -279,7 +279,7 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent) ret = 1; if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE, (void *) &ret, sizeof (ret)) < 0) { - gst_tcp_socket_close (&this->server_sock.fd); + gst_tcp_socket_close (&this->server_sock); GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL), ("Could not setsockopt: %s", g_strerror (errno))); return FALSE; @@ -297,7 +297,7 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent) sizeof (this->server_sin)); if (ret) { - gst_tcp_socket_close (&this->server_sock.fd); + gst_tcp_socket_close (&this->server_sock); switch (errno) { default: GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), @@ -314,7 +314,7 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent) GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d", this->server_sock.fd, TCP_BACKLOG); if (listen (this->server_sock.fd, TCP_BACKLOG) == -1) { - gst_tcp_socket_close (&this->server_sock.fd); + gst_tcp_socket_close (&this->server_sock); GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), ("Could not listen on server socket: %s", g_strerror (errno))); return FALSE; @@ -323,8 +323,8 @@ gst_tcp_server_sink_init_send (GstMultiFdSink * parent) "listened on server socket %d, returning from connection setup", this->server_sock.fd); - gst_fdset_add_fd (parent->fdset, &this->server_sock); - gst_fdset_fd_ctl_read (parent->fdset, &this->server_sock, TRUE); + gst_poll_add_fd (parent->fdset, &this->server_sock); + gst_poll_fd_ctl_read (parent->fdset, &this->server_sock, TRUE); return TRUE; } @@ -335,7 +335,7 @@ gst_tcp_server_sink_close (GstMultiFdSink * parent) GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent); if (this->server_sock.fd != -1) { - gst_fdset_remove_fd (parent->fdset, &this->server_sock); + gst_poll_remove_fd (parent->fdset, &this->server_sock); close (this->server_sock.fd); this->server_sock.fd = -1; diff --git a/gst/tcp/gsttcpserversink.h b/gst/tcp/gsttcpserversink.h index 99841945a5..ac8846da7a 100644 --- a/gst/tcp/gsttcpserversink.h +++ b/gst/tcp/gsttcpserversink.h @@ -76,7 +76,7 @@ struct _GstTCPServerSink { struct sockaddr_in server_sin; /* socket */ - GstFD server_sock; + GstPollFD server_sock; }; struct _GstTCPServerSinkClass { diff --git a/gst/tcp/gsttcpserversrc.c b/gst/tcp/gsttcpserversrc.c index b638f438a8..329a04b0f5 100644 --- a/gst/tcp/gsttcpserversrc.c +++ b/gst/tcp/gsttcpserversrc.c @@ -32,24 +32,6 @@ #include -/* control stuff stolen from fdsrc */ -#define CONTROL_STOP 'S' /* stop the select call */ -#define CONTROL_SOCKETS(o) o->control_fds -#define WRITE_SOCKET(o) o->control_fds[1] -#define READ_SOCKET(o) o->control_fds[0] - -#define SEND_COMMAND(o, command) \ -G_STMT_START { \ - unsigned char c; c = command; \ - write (WRITE_SOCKET(o), &c, 1); \ -} G_STMT_END - -#define READ_COMMAND(o, command, res) \ -G_STMT_START { \ - res = read(READ_SOCKET(o), &command, 1); \ -} G_STMT_END - - GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug); #define GST_CAT_DEFAULT tcpserversrc_debug @@ -147,13 +129,10 @@ gst_tcp_server_src_init (GstTCPServerSrc * src, GstTCPServerSrcClass * g_class) { src->server_port = TCP_DEFAULT_PORT; src->host = g_strdup (TCP_DEFAULT_HOST); - src->server_sock_fd = -1; - src->client_sock_fd = -1; + src->server_sock_fd.fd = -1; + src->client_sock_fd.fd = -1; src->protocol = GST_TCP_PROTOCOL_NONE; - READ_SOCKET (src) = -1; - WRITE_SOCKET (src) = -1; - GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN); } @@ -172,8 +151,6 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) { GstTCPServerSrc *src; GstFlowReturn ret = GST_FLOW_OK; - fd_set testfds; - int maxfdp1; src = GST_TCP_SERVER_SRC (psrc); @@ -181,36 +158,33 @@ gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) goto wrong_state; restart: - /* do a blocking select on the socket */ - FD_ZERO (&testfds); - - /* always select on cancel socket */ - FD_SET (READ_SOCKET (src), &testfds); - - if (src->client_sock_fd >= 0) { + if (src->client_sock_fd.fd >= 0) { /* if we have a client, wait for read */ - FD_SET (src->client_sock_fd, &testfds); - maxfdp1 = MAX (src->client_sock_fd, READ_SOCKET (src)) + 1; + gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, FALSE); + gst_poll_fd_ctl_read (src->fdset, &src->client_sock_fd, TRUE); } else { /* else wait on server socket for connections */ - FD_SET (src->server_sock_fd, &testfds); - maxfdp1 = MAX (src->server_sock_fd, READ_SOCKET (src)) + 1; + gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, TRUE); } /* no action (0) is an error too in our case */ - if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0) - goto select_error; - - if (FD_ISSET (READ_SOCKET (src), &testfds)) - goto select_cancelled; + if ((ret = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE)) <= 0) { + if (ret == -1 && errno == EBUSY) + goto select_cancelled; + else + goto select_error; + } /* if we have no client socket we can accept one now */ - if (src->client_sock_fd < 0) { - if (FD_ISSET (src->server_sock_fd, &testfds)) { - if ((src->client_sock_fd = - accept (src->server_sock_fd, (struct sockaddr *) &src->client_sin, + if (src->client_sock_fd.fd < 0) { + if (gst_poll_fd_can_read (src->fdset, &src->server_sock_fd)) { + if ((src->client_sock_fd.fd = + accept (src->server_sock_fd.fd, + (struct sockaddr *) &src->client_sin, &src->client_sin_len)) == -1) goto accept_error; + + gst_poll_add_fd (src->fdset, &src->client_sock_fd); } /* and restart now to poll the socket. */ goto restart; @@ -220,8 +194,8 @@ restart: switch (src->protocol) { case GST_TCP_PROTOCOL_NONE: - ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd, - READ_SOCKET (src), outbuf); + ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd, + src->fdset, outbuf); break; case GST_TCP_PROTOCOL_GDP: @@ -229,8 +203,8 @@ restart: GstCaps *caps; gchar *string; - ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd, - READ_SOCKET (src), &caps); + ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd.fd, + src->fdset, &caps); if (ret == GST_FLOW_WRONG_STATE) goto gdp_cancelled; @@ -246,8 +220,8 @@ restart: gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps); } - ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd, - READ_SOCKET (src), outbuf); + ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd, + src->fdset, outbuf); if (ret == GST_FLOW_OK) gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src))); @@ -369,26 +343,19 @@ gst_tcp_server_src_start (GstBaseSrc * bsrc) int ret; GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc); - /* create the control sockets before anything */ - if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src)) < 0) - goto socket_pair; - - fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK); - fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK); - /* reset caps_received flag */ src->caps_received = FALSE; /* create the server listener socket */ - if ((src->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) + if ((src->server_sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) goto socket_error; GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d", - src->server_sock_fd); + src->server_sock_fd.fd); /* make address reusable */ ret = 1; - if (setsockopt (src->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret, + if (setsockopt (src->server_sock_fd.fd, SOL_SOCKET, SO_REUSEADDR, &ret, sizeof (int)) < 0) goto sock_opt; @@ -408,16 +375,22 @@ gst_tcp_server_src_start (GstBaseSrc * bsrc) /* bind it */ GST_DEBUG_OBJECT (src, "binding server socket to address"); - if ((ret = bind (src->server_sock_fd, (struct sockaddr *) &src->server_sin, + if ((ret = bind (src->server_sock_fd.fd, (struct sockaddr *) &src->server_sin, sizeof (src->server_sin))) < 0) goto bind_error; GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d", - src->server_sock_fd, TCP_BACKLOG); + src->server_sock_fd.fd, TCP_BACKLOG); - if (listen (src->server_sock_fd, TCP_BACKLOG) == -1) + if (listen (src->server_sock_fd.fd, TCP_BACKLOG) == -1) goto listen_error; + /* create an fdset to keep track of our file descriptors */ + if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL) + goto socket_pair; + + gst_poll_add_fd (src->fdset, &src->server_sock_fd); + GST_DEBUG_OBJECT (src, "received client"); GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN); @@ -425,12 +398,6 @@ gst_tcp_server_src_start (GstBaseSrc * bsrc) return TRUE; /* ERRORS */ -socket_pair: - { - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), - GST_ERROR_SYSTEM); - return FALSE; - } socket_error: { GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM); @@ -440,6 +407,7 @@ sock_opt: { GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), ("Could not setsockopt: %s", g_strerror (errno))); + gst_tcp_socket_close (&src->server_sock_fd); return FALSE; } host_error: @@ -465,6 +433,13 @@ listen_error: ("Could not listen on server socket: %s", g_strerror (errno))); return FALSE; } +socket_pair: + { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), + GST_ERROR_SYSTEM); + gst_tcp_socket_close (&src->server_sock_fd); + return FALSE; + } } static gboolean @@ -472,20 +447,13 @@ gst_tcp_server_src_stop (GstBaseSrc * bsrc) { GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc); - if (src->server_sock_fd != -1) { - close (src->server_sock_fd); - src->server_sock_fd = -1; - } - if (src->client_sock_fd != -1) { - close (src->client_sock_fd); - src->client_sock_fd = -1; - } - GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN); + gst_poll_free (src->fdset); + src->fdset = NULL; - close (READ_SOCKET (src)); - close (WRITE_SOCKET (src)); - READ_SOCKET (src) = -1; - WRITE_SOCKET (src) = -1; + gst_tcp_socket_close (&src->server_sock_fd); + gst_tcp_socket_close (&src->client_sock_fd); + + GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN); return TRUE; } @@ -496,7 +464,7 @@ gst_tcp_server_src_unlock (GstBaseSrc * bsrc) { GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc); - SEND_COMMAND (src, CONTROL_STOP); + gst_poll_set_flushing (src->fdset, TRUE); return TRUE; } diff --git a/gst/tcp/gsttcpserversrc.h b/gst/tcp/gsttcpserversrc.h index 4e32f47de4..22c7afe278 100644 --- a/gst/tcp/gsttcpserversrc.h +++ b/gst/tcp/gsttcpserversrc.h @@ -65,14 +65,14 @@ struct _GstTCPServerSrc { int server_port; gchar *host; struct sockaddr_in server_sin; - int server_sock_fd; + GstPollFD server_sock_fd; /* client information */ struct sockaddr_in client_sin; socklen_t client_sin_len; - int client_sock_fd; + GstPollFD client_sock_fd; - int control_fds[2]; + GstPoll *fdset; GstTCPProtocol protocol; /* protocol used for reading data */ gboolean caps_received; /* if we have received caps yet */ -- 2.34.1