+2008-02-28 Wim Taymans <wim.taymans@collabora.co.uk>
+
+ Patch by: Peter Kjellerstedt <pkj at axis com>
+
+ * gst/tcp/Makefile.am:
+ * gst/tcp/fdsetstress.c:
+ * gst/tcp/gstfdset.c:
+ * gst/tcp/gstfdset.h:
+ Removed fdset and stress test, they are now known as GstPoll in
+ core.
+
+ * gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_class_init),
+ (gst_multi_fd_sink_add_full), (gst_multi_fd_sink_remove),
+ (gst_multi_fd_sink_clear), (gst_multi_fd_sink_remove_client_link),
+ (gst_multi_fd_sink_handle_client_write),
+ (gst_multi_fd_sink_queue_buffer),
+ (gst_multi_fd_sink_handle_clients), (gst_multi_fd_sink_start),
+ (gst_multi_fd_sink_stop):
+ * gst/tcp/gstmultifdsink.h:
+ * gst/tcp/gsttcp.c: (gst_tcp_socket_read), (gst_tcp_socket_close),
+ (gst_tcp_read_buffer), (gst_tcp_gdp_read_buffer),
+ (gst_tcp_gdp_read_caps):
+ * gst/tcp/gsttcp.h:
+ * gst/tcp/gsttcpclientsink.c: (gst_tcp_client_sink_init),
+ (gst_tcp_client_sink_setcaps), (gst_tcp_client_sink_render),
+ (gst_tcp_client_sink_start), (gst_tcp_client_sink_stop):
+ * gst/tcp/gsttcpclientsink.h:
+ * gst/tcp/gsttcpclientsrc.c: (gst_tcp_client_src_init),
+ (gst_tcp_client_src_create), (gst_tcp_client_src_start),
+ (gst_tcp_client_src_stop), (gst_tcp_client_src_unlock):
+ * gst/tcp/gsttcpclientsrc.h:
+ * gst/tcp/gsttcpserversink.c: (gst_tcp_server_sink_handle_wait),
+ (gst_tcp_server_sink_init_send), (gst_tcp_server_sink_close):
+ * gst/tcp/gsttcpserversink.h:
+ * gst/tcp/gsttcpserversrc.c: (gst_tcp_server_src_init),
+ (gst_tcp_server_src_create), (gst_tcp_server_src_start),
+ (gst_tcp_server_src_stop), (gst_tcp_server_src_unlock):
+ * gst/tcp/gsttcpserversrc.h:
+ Port to GstPoll. See #505417.
+
2008-02-28 Wim Taymans <wim.taymans@collabora.co.uk>
Patch by: Peter Kjellerstedt <pkj at axis com>
libgsttcp_la_SOURCES = \
gsttcpplugin.c \
gsttcp.c \
- gstfdset.c \
gstmultifdsink.c \
gsttcpclientsrc.c gsttcpclientsink.c \
gsttcpserversrc.c gsttcpserversink.c
noinst_HEADERS = \
gsttcpplugin.h \
gsttcp.h \
- gstfdset.h \
gstmultifdsink.h \
gsttcpclientsrc.h gsttcpclientsink.h \
gsttcpserversrc.h gsttcpserversink.h
CLEANFILES = $(BUILT_SOURCES)
EXTRA_DIST = gsttcp-marshal.list
-
-noinst_PROGRAMS = fdsetstress
-
-fdsetstress_SOURCES = fdsetstress.c gstfdset.c
-fdsetstress_CFLAGS = $(GST_CFLAGS)
-fdsetstress_LDFLAGS = $(GST_LIBS)
+++ /dev/null
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-#include <stdlib.h>
-#include <gst/gst.h>
-
-#include "gstfdset.h"
-
-static GstFDSet *set;
-static GList *fds = NULL;
-static GMutex *fdlock;
-static GTimer *timer;
-
-#define MAX_THREADS 100
-
-static void
-mess_some_more ()
-{
- GList *walk;
- gint random;
- gint removed = 0;
-
- g_mutex_lock (fdlock);
-
- for (walk = fds; walk;) {
- GstFD *fd = (GstFD *) walk->data;
-
- walk = g_list_next (walk);
-
- random = (gint) (10.0 * rand () / (RAND_MAX + 1.0));
- switch (random) {
- case 0:
- {
- /*
- GstFD *newfd = g_new0 (GstFD, 1);
-
- gst_fdset_add_fd (set, newfd);
- fds = g_list_prepend (fds, newfd);
- */
- break;
- }
- case 1:
- if ((gint) (10.0 * rand () / (RAND_MAX + 1.0)) < 2) {
- gst_fdset_remove_fd (set, fd);
- fds = g_list_remove (fds, fd);
- g_free (fd);
- removed++;
- }
- break;
-
- case 2:
- gst_fdset_fd_ctl_write (set, fd, TRUE);
- break;
- case 3:
- gst_fdset_fd_ctl_write (set, fd, FALSE);
- break;
- case 4:
- gst_fdset_fd_ctl_read (set, fd, TRUE);
- break;
- case 5:
- gst_fdset_fd_ctl_read (set, fd, FALSE);
- break;
-
- case 6:
- gst_fdset_fd_has_closed (set, fd);
- break;
- case 7:
- gst_fdset_fd_has_error (set, fd);
- break;
- case 8:
- gst_fdset_fd_can_read (set, fd);
- break;
- case 9:
- gst_fdset_fd_can_write (set, fd);
- break;
- default:
- g_assert_not_reached ();
- break;
- }
- }
- if (g_list_length (fds) < 900) {
- random = removed + (gint) (2.0 * rand () / (RAND_MAX + 1.0));
- while (random) {
- GstFD *newfd = g_new0 (GstFD, 1);
-
- gst_fdset_add_fd (set, newfd);
- fds = g_list_prepend (fds, newfd);
- random--;
- }
- }
-
- g_mutex_unlock (fdlock);
-}
-
-void *
-run_test (void *threadid)
-{
- gint id = GPOINTER_TO_INT (threadid);
-
- while (TRUE) {
- if (id == 0) {
- gint res = gst_fdset_wait (set, 10);
-
- if (res < 0) {
- g_print ("error %d %s\n", errno, g_strerror (errno));
- }
- } else {
- mess_some_more ();
- if (g_timer_elapsed (timer, NULL) > 0.5) {
- g_mutex_lock (fdlock);
- g_print ("active fds :%d\n", g_list_length (fds));
- g_timer_start (timer);
- g_mutex_unlock (fdlock);
- }
- g_usleep (1);
- }
- }
-
- g_thread_exit (NULL);
- return NULL;
-}
-
-gint
-main (gint argc, gchar * argv[])
-{
- GThread *threads[MAX_THREADS];
- gint num_threads;
- gint t;
-
- gst_init (&argc, &argv);
-
- fdlock = g_mutex_new ();
- timer = g_timer_new ();
-
- if (argc != 2) {
- g_print ("usage: %s <num_threads>\n", argv[0]);
- exit (-1);
- }
-
- num_threads = atoi (argv[1]);
-
- set = gst_fdset_new (GST_FDSET_MODE_POLL);
-
- for (t = 0; t < num_threads; t++) {
- GError *error = NULL;
-
- threads[t] = g_thread_create (run_test, GINT_TO_POINTER (t), TRUE, &error);
- if (error) {
- printf ("ERROR: g_thread_create() %s\n", error->message);
- exit (-1);
- }
- }
- printf ("main(): Created %d threads.\n", t);
-
- for (t = 0; t < num_threads; t++) {
- g_thread_join (threads[t]);
- }
-
- gst_fdset_free (set);
-
- return 0;
-}
+++ /dev/null
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- * Copyright (C) <2004> Wim Taymans <wim@fluendo.com>
- *
- * gsttcpfdset.h: fdset datastructure
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-#define MIN_POLLFDS 32
-#define INIT_POLLFDS MIN_POLLFDS
-
-#include <sys/poll.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <unistd.h>
-/* OS/X needs this because of bad headers */
-#include <string.h>
-
-#include "gstfdset.h"
-
-GType
-gst_fdset_mode_get_type (void)
-{
- static GType fdset_mode_type = 0;
- static const GEnumValue fdset_mode[] = {
- {GST_FDSET_MODE_SELECT, "Select", "select"},
- {GST_FDSET_MODE_POLL, "Poll", "poll"},
- {GST_FDSET_MODE_EPOLL, "EPoll", "epoll"},
- {0, NULL, NULL},
- };
-
- if (!fdset_mode_type) {
- fdset_mode_type = g_enum_register_static ("GstFDSetMode", fdset_mode);
- }
- return fdset_mode_type;
-}
-
-struct _GstFDSet
-{
- GstFDSetMode mode;
-
- /* for poll */
- struct pollfd *testpollfds;
- gint last_testpollfds;
- gint testsize;
-
- struct pollfd *pollfds;
- gint size;
- gint free;
- gint last_pollfds;
- GMutex *poll_lock;
-
- /* for select */
- fd_set readfds, writefds; /* input */
- fd_set testreadfds, testwritefds; /* output */
-};
-
-static gint
-nearest_pow (gint num)
-{
- /* hacker's delight page 48 */
- num -= 1;
- num |= num >> 1;
- num |= num >> 2;
- num |= num >> 4;
- num |= num >> 8;
- num |= num >> 16;
-
- return num + 1;
-}
-
-/* resize a given pollfd array from old_size number of items
- * to new_size number of items. Also initializes the new elements
- * with the default values. */
-static struct pollfd *
-resize (struct pollfd *fds, gint old_size, gint new_size)
-{
- struct pollfd *res;
- gint i;
-
- res = g_realloc (fds, new_size * sizeof (struct pollfd));
- for (i = old_size; i < new_size; i++) {
- res[i].fd = -1;
- res[i].events = 0;
- res[i].revents = 0;
- }
- return res;
-}
-
-static void
-ensure_size (GstFDSet * set, gint len)
-{
- if (len > set->size) {
- len = nearest_pow (len);
- len = MAX (len, MIN_POLLFDS);
-
- set->pollfds = resize (set->pollfds, set->size, len);
- set->size = len;
- }
-}
-
-GstFDSet *
-gst_fdset_new (GstFDSetMode mode)
-{
- GstFDSet *nset;
-
- nset = g_new0 (GstFDSet, 1);
- nset->mode = mode;
-
- switch (mode) {
- case GST_FDSET_MODE_SELECT:
- FD_ZERO (&nset->readfds);
- FD_ZERO (&nset->writefds);
- break;
- case GST_FDSET_MODE_POLL:
- nset->pollfds = NULL;
- nset->testpollfds = NULL;
- nset->free = 0;
- nset->last_pollfds = 0;
- nset->poll_lock = g_mutex_new ();
- ensure_size (nset, MIN_POLLFDS);
- break;
- case GST_FDSET_MODE_EPOLL:
- g_warning ("implement EPOLL mode in GstFDSet");
- break;
- default:
- break;
- }
- return nset;
-}
-
-void
-gst_fdset_free (GstFDSet * set)
-{
- g_return_if_fail (set != NULL);
-
- switch (set->mode) {
- case GST_FDSET_MODE_SELECT:
- break;
- case GST_FDSET_MODE_POLL:
- g_free (set->testpollfds);
- g_free (set->pollfds);
- g_mutex_free (set->poll_lock);
- break;
- case GST_FDSET_MODE_EPOLL:
- g_warning ("implement EPOLL mode in GstFDSet");
- break;
- default:
- break;
- }
- g_free (set);
-}
-
-
-void
-gst_fdset_set_mode (GstFDSet * set, GstFDSetMode mode)
-{
- g_return_if_fail (set != NULL);
-
- g_warning ("implement set_mode in GstFDSet");
-}
-
-GstFDSetMode
-gst_fdset_get_mode (GstFDSet * set)
-{
- g_return_val_if_fail (set != NULL, FALSE);
-
- return set->mode;
-}
-
-gboolean
-gst_fdset_add_fd (GstFDSet * set, GstFD * fd)
-{
- gboolean res = FALSE;
-
- g_return_val_if_fail (set != NULL, FALSE);
- g_return_val_if_fail (fd != NULL, FALSE);
-
- switch (set->mode) {
- case GST_FDSET_MODE_SELECT:
- res = TRUE;
- break;
- case GST_FDSET_MODE_POLL:
- {
- struct pollfd *nfd;
- gint idx;
-
- g_mutex_lock (set->poll_lock);
-
- ensure_size (set, set->last_pollfds + 1);
-
- idx = set->free;
- if (idx == -1) {
- /* find free space */
- while (idx < set->last_pollfds) {
- idx++;
- if (set->pollfds[idx].fd == -1)
- break;
- }
- }
- nfd = &set->pollfds[idx];
-
- nfd->fd = fd->fd;
- nfd->events = POLLERR | POLLNVAL | POLLHUP;
- nfd->revents = 0;
-
- /* see if we have one fd more */
- set->last_pollfds = MAX (idx + 1, set->last_pollfds);
- fd->idx = idx;
- set->free = -1;
- g_mutex_unlock (set->poll_lock);
-
- res = TRUE;
- break;
- }
- case GST_FDSET_MODE_EPOLL:
- break;
- }
- return res;
-}
-
-gboolean
-gst_fdset_remove_fd (GstFDSet * set, GstFD * fd)
-{
- gboolean res = FALSE;
-
- g_return_val_if_fail (set != NULL, FALSE);
- g_return_val_if_fail (fd != NULL, FALSE);
-
- switch (set->mode) {
- case GST_FDSET_MODE_SELECT:
- /* nothing */
- FD_CLR (fd->fd, &set->writefds);
- FD_CLR (fd->fd, &set->readfds);
- res = TRUE;
- break;
- case GST_FDSET_MODE_POLL:
- {
- g_mutex_lock (set->poll_lock);
-
- /* FIXME on some platforms poll doesn't ignore the fd
- * when set to -1 */
- set->pollfds[fd->idx].fd = -1;
- set->pollfds[fd->idx].events = 0;
- set->pollfds[fd->idx].revents = 0;
-
- /* if we removed the last fd, we can lower the last_pollfds */
- if (fd->idx + 1 == set->last_pollfds) {
- set->last_pollfds--;
- }
- fd->idx = -1;
-
- if (set->free == -1) {
- set->free = fd->idx;
- } else {
- set->free = MIN (set->free, fd->idx);
- }
- g_mutex_unlock (set->poll_lock);
- res = TRUE;
- break;
- }
- case GST_FDSET_MODE_EPOLL:
- break;
- }
- return res;
-}
-
-void
-gst_fdset_fd_ctl_write (GstFDSet * set, GstFD * fd, gboolean active)
-{
- g_return_if_fail (set != NULL);
- g_return_if_fail (fd != NULL);
-
- switch (set->mode) {
- case GST_FDSET_MODE_SELECT:
- if (active)
- FD_SET (fd->fd, &set->writefds);
- else
- FD_CLR (fd->fd, &set->writefds);
- break;
- case GST_FDSET_MODE_POLL:
- {
- gint idx;
-
- g_mutex_lock (set->poll_lock);
-
- idx = fd->idx;
- if (idx >= 0) {
- gint events = set->pollfds[idx].events;
-
- if (active)
- events |= POLLOUT;
- else
- events &= ~POLLOUT;
-
- set->pollfds[idx].events = events;
- }
- g_mutex_unlock (set->poll_lock);
- break;
- }
- case GST_FDSET_MODE_EPOLL:
- break;
- }
-}
-
-void
-gst_fdset_fd_ctl_read (GstFDSet * set, GstFD * fd, gboolean active)
-{
- g_return_if_fail (set != NULL);
- g_return_if_fail (fd != NULL);
-
- switch (set->mode) {
- case GST_FDSET_MODE_SELECT:
- if (active)
- FD_SET (fd->fd, &set->readfds);
- else
- FD_CLR (fd->fd, &set->readfds);
- break;
- case GST_FDSET_MODE_POLL:
- {
- gint idx;
-
- g_mutex_lock (set->poll_lock);
-
- idx = fd->idx;
- if (idx >= 0) {
- gint events = set->pollfds[idx].events;
-
- if (active)
- events |= (POLLIN | POLLPRI);
- else
- events &= ~(POLLIN | POLLPRI);
-
- set->pollfds[idx].events = events;
- }
- g_mutex_unlock (set->poll_lock);
- break;
- }
- case GST_FDSET_MODE_EPOLL:
- break;
- }
-}
-
-gboolean
-gst_fdset_fd_has_closed (GstFDSet * set, GstFD * fd)
-{
- gboolean res = FALSE;
-
- g_return_val_if_fail (set != NULL, FALSE);
- g_return_val_if_fail (fd != NULL, FALSE);
-
- switch (set->mode) {
- case GST_FDSET_MODE_SELECT:
- res = FALSE;
- break;
- case GST_FDSET_MODE_POLL:
- {
- gint idx;
-
- g_mutex_lock (set->poll_lock);
- idx = fd->idx;
-
- if (idx >= 0 && idx < set->last_testpollfds) {
- res = (set->testpollfds[idx].revents & POLLHUP) != 0;
- }
- g_mutex_unlock (set->poll_lock);
- break;
- }
- case GST_FDSET_MODE_EPOLL:
- break;
- }
- return res;
-}
-
-gboolean
-gst_fdset_fd_has_error (GstFDSet * set, GstFD * fd)
-{
- gboolean res = FALSE;
-
- g_return_val_if_fail (set != NULL, FALSE);
- g_return_val_if_fail (fd != NULL, FALSE);
-
- switch (set->mode) {
- case GST_FDSET_MODE_SELECT:
- res = FALSE;
- break;
- case GST_FDSET_MODE_POLL:
- {
- gint idx;
-
- g_mutex_lock (set->poll_lock);
- idx = fd->idx;
-
- if (idx >= 0 && idx < set->last_testpollfds) {
- res = (set->testpollfds[idx].revents & (POLLERR | POLLNVAL)) != 0;
- }
- g_mutex_unlock (set->poll_lock);
- break;
- }
- case GST_FDSET_MODE_EPOLL:
- break;
- }
- return res;
-}
-
-gboolean
-gst_fdset_fd_can_read (GstFDSet * set, GstFD * fd)
-{
- gboolean res = FALSE;
-
- g_return_val_if_fail (set != NULL, FALSE);
- g_return_val_if_fail (fd != NULL, FALSE);
-
- switch (set->mode) {
- case GST_FDSET_MODE_SELECT:
- res = FD_ISSET (fd->fd, &set->testreadfds);
- break;
- case GST_FDSET_MODE_POLL:
- {
- gint idx;
-
- g_mutex_lock (set->poll_lock);
- idx = fd->idx;
-
- if (idx >= 0 && idx < set->last_testpollfds) {
- res = (set->testpollfds[idx].revents & (POLLIN | POLLPRI)) != 0;
- }
- g_mutex_unlock (set->poll_lock);
- break;
- }
- case GST_FDSET_MODE_EPOLL:
- break;
- }
- return res;
-}
-
-gboolean
-gst_fdset_fd_can_write (GstFDSet * set, GstFD * fd)
-{
- gboolean res = FALSE;
-
- g_return_val_if_fail (set != NULL, FALSE);
- g_return_val_if_fail (fd != NULL, FALSE);
-
- switch (set->mode) {
- case GST_FDSET_MODE_SELECT:
- res = FD_ISSET (fd->fd, &set->testwritefds);
- break;
- case GST_FDSET_MODE_POLL:
- {
- gint idx;
-
- g_mutex_lock (set->poll_lock);
- idx = fd->idx;
-
- if (idx >= 0 && idx < set->last_testpollfds) {
- res = (set->testpollfds[idx].revents & POLLOUT) != 0;
- }
- g_mutex_unlock (set->poll_lock);
- break;
- }
- case GST_FDSET_MODE_EPOLL:
- break;
- }
- return res;
-}
-
-gint
-gst_fdset_wait (GstFDSet * set, int timeout)
-{
- int res = -1;
-
- g_return_val_if_fail (set != NULL, -1);
-
- switch (set->mode) {
- case GST_FDSET_MODE_SELECT:
- {
- struct timeval tv;
- struct timeval *tvptr;
-
- set->testreadfds = set->readfds;
- set->testwritefds = set->writefds;
-
- if (timeout > 0) {
- tv.tv_sec = timeout / 1000;
- tv.tv_usec = timeout % 1000;
-
- tvptr = &tv;
- } else {
- tvptr = NULL;
- }
- res =
- select (FD_SETSIZE, &set->testreadfds, &set->testwritefds,
- (fd_set *) 0, tvptr);
- break;
- }
- case GST_FDSET_MODE_POLL:
- {
- g_mutex_lock (set->poll_lock);
- if (set->testsize != set->size) {
- set->testpollfds = resize (set->testpollfds, set->testsize, set->size);
- set->testsize = set->size;
- }
- set->last_testpollfds = set->last_pollfds;
- memcpy (set->testpollfds, set->pollfds,
- sizeof (struct pollfd) * set->last_testpollfds);
- g_mutex_unlock (set->poll_lock);
-
- res = poll (set->testpollfds, set->last_testpollfds, timeout);
-
- break;
- }
- case GST_FDSET_MODE_EPOLL:
- break;
- }
-
- return res;
-}
+++ /dev/null
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- * Copyright (C) <2004> Wim Taymans <wim@fluendo.com>
- *
- * gstfdset.h: fdset datastructure
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-#ifndef __GST_FDSET_H__
-#define __GST_FDSET_H__
-
-#include <gst/gst.h>
-
-G_BEGIN_DECLS
-
-typedef struct _GstFDSet GstFDSet;
-
-typedef struct {
- int fd;
- gint idx;
-} GstFD;
-
-typedef enum {
- GST_FDSET_MODE_SELECT,
- GST_FDSET_MODE_POLL,
- GST_FDSET_MODE_EPOLL
-} GstFDSetMode;
-
-#define GST_TYPE_FDSET_MODE (gst_fdset_mode_get_type())
-GType gst_fdset_mode_get_type (void);
-
-
-GstFDSet* gst_fdset_new (GstFDSetMode mode);
-void gst_fdset_free (GstFDSet *set);
-
-void gst_fdset_set_mode (GstFDSet *set, GstFDSetMode mode);
-GstFDSetMode gst_fdset_get_mode (GstFDSet *set);
-
-gboolean gst_fdset_add_fd (GstFDSet *set, GstFD *fd);
-gboolean gst_fdset_remove_fd (GstFDSet *set, GstFD *fd);
-
-void gst_fdset_fd_ctl_write (GstFDSet *set, GstFD *fd, gboolean active);
-void gst_fdset_fd_ctl_read (GstFDSet *set, GstFD *fd, gboolean active);
-
-gboolean gst_fdset_fd_has_closed (GstFDSet *set, GstFD *fd);
-gboolean gst_fdset_fd_has_error (GstFDSet *set, GstFD *fd);
-gboolean gst_fdset_fd_can_read (GstFDSet *set, GstFD *fd);
-gboolean gst_fdset_fd_can_write (GstFDSet *set, GstFD *fd);
-
-gint gst_fdset_wait (GstFDSet *set, int timeout);
-
-G_END_DECLS
-
-#endif /* __GST_FDSET_H__ */
#define NOT_IMPLEMENTED 0
-/* the select call is also performed on the control sockets, that way
- * we can send special commands to unblock or restart the select call */
-#define CONTROL_RESTART 'R' /* restart the select call */
-#define CONTROL_STOP 'S' /* stop the select call */
-#define CONTROL_SOCKETS(sink) sink->control_sock
-#define WRITE_SOCKET(sink) sink->control_sock[1]
-#define READ_SOCKET(sink) sink->control_sock[0]
-
-#define SEND_COMMAND(sink, command) \
-G_STMT_START { \
- unsigned char c; c = command; \
- write (WRITE_SOCKET(sink).fd, &c, 1); \
-} G_STMT_END
-
-#define READ_COMMAND(sink, command, res) \
-G_STMT_START { \
- res = read(READ_SOCKET(sink).fd, &command, 1);\
-} G_STMT_END
-
/* elementfactory information */
static const GstElementDetails gst_multi_fd_sink_details =
GST_ELEMENT_DETAILS ("Multi filedescriptor sink",
/* this is really arbitrarily chosen */
#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_NONE
-#define DEFAULT_MODE GST_FDSET_MODE_POLL
+#define DEFAULT_MODE GST_POLL_MODE_AUTO
#define DEFAULT_BUFFERS_MAX -1
#define DEFAULT_BUFFERS_SOFT_MAX -1
#define DEFAULT_TIME_MIN -1
GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_MODE,
g_param_spec_enum ("mode", "Mode",
- "The mode for selecting activity on the fds", GST_TYPE_FDSET_MODE,
+ "The mode for selecting activity on the fds", GST_TYPE_POLL_MODE,
DEFAULT_MODE, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX,
/* set the socket to non blocking */
res = fcntl (fd, F_SETFL, O_NONBLOCK);
/* we always read from a client */
- gst_fdset_add_fd (sink->fdset, &client->fd);
+ gst_poll_add_fd (sink->fdset, &client->fd);
/* we don't try to read from write only fds */
flags = fcntl (fd, F_GETFL, 0);
if ((flags & O_ACCMODE) != O_WRONLY) {
- gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE);
+ gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE);
}
/* figure out the mode, can't use send() for non sockets */
res = fstat (fd, &statbuf);
client->is_socket = TRUE;
}
- SEND_COMMAND (sink, CONTROL_RESTART);
+ gst_poll_restart (sink->fdset);
CLIENTS_UNLOCK (sink);
client->status = GST_CLIENT_STATUS_REMOVED;
gst_multi_fd_sink_remove_client_link (sink, clink);
- SEND_COMMAND (sink, CONTROL_RESTART);
+ gst_poll_restart (sink->fdset);
} else {
GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
}
client->status = GST_CLIENT_STATUS_REMOVED;
gst_multi_fd_sink_remove_client_link (sink, clients);
}
- SEND_COMMAND (sink, CONTROL_RESTART);
+ gst_poll_restart (sink->fdset);
CLIENTS_UNLOCK (sink);
}
break;
}
- gst_fdset_remove_fd (sink->fdset, &client->fd);
+ gst_poll_remove_fd (sink->fdset, &client->fd);
g_get_current_time (&now);
client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
if (client->bufpos == -1) {
/* client is too fast, remove from write queue until new buffer is
* available */
- gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
+ gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
/* if we flushed out all of the client buffers, we can stop */
if (client->flushcount == 0)
goto flushed;
client->bufpos = position;
} else {
/* cannot send data to this client yet */
- gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE);
+ gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE);
return TRUE;
}
}
} else if (client->bufpos == 0 || client->new_connection) {
/* can send data to this client now. need to signal the select thread that
* the fd_set changed */
- gst_fdset_fd_ctl_write (sink->fdset, &client->fd, TRUE);
+ gst_poll_fd_ctl_write (sink->fdset, &client->fd, TRUE);
need_signal = TRUE;
}
/* keep track of maximum buffer usage */
/* and send a signal to thread if fd_set changed */
if (need_signal) {
- SEND_COMMAND (sink, CONTROL_RESTART);
+ gst_poll_restart (sink->fdset);
}
}
fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
do {
- gboolean stop = FALSE;
-
try_again = FALSE;
/* check for:
* - client socket input (ie, clients saying goodbye)
* - client socket output (ie, client reads) */
GST_LOG_OBJECT (sink, "waiting on action on fdset");
- result = gst_fdset_wait (sink->fdset, -1);
+ result = gst_poll_wait (sink->fdset, GST_CLOCK_TIME_NONE);
/* < 0 is an error, 0 just means a timeout happened, which is impossible */
if (result < 0) {
* are not valid */
try_again = TRUE;
} else if (errno == EINTR) {
- /* interrupted system call, just redo the select */
+ /* interrupted system call, just redo the wait */
try_again = TRUE;
+ } else if (errno == EBUSY) {
+ /* the call to gst_poll_wait() was flushed */
+ return;
} else {
/* this is quite bad... */
GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
}
} else {
GST_LOG_OBJECT (sink, "wait done: %d sockets with events", result);
- /* read all commands */
- if (gst_fdset_fd_can_read (sink->fdset, &READ_SOCKET (sink))) {
- GST_LOG_OBJECT (sink, "have a command");
- while (TRUE) {
- gchar command;
- int res;
-
- READ_COMMAND (sink, command, res);
- if (res <= 0) {
- GST_LOG_OBJECT (sink, "no more commands");
- /* no more commands */
- break;
- }
-
- switch (command) {
- case CONTROL_RESTART:
- GST_LOG_OBJECT (sink, "restart");
- /* need to restart the select call as the fd_set changed */
- /* if other file descriptors than the READ_SOCKET had activity,
- * we don't restart just yet, but handle the other clients first
- */
- if (result == 1)
- try_again = TRUE;
- break;
- case CONTROL_STOP:
- /* break out of the select loop */
- GST_LOG_OBJECT (sink, "stop");
- /* stop this function */
- stop = TRUE;
- break;
- default:
- GST_WARNING_OBJECT (sink, "unkown");
- g_warning ("multifdsink: unknown control message received");
- break;
- }
- }
- }
- }
- if (stop) {
- return;
}
} while (try_again);
continue;
}
- if (gst_fdset_fd_has_closed (sink->fdset, &client->fd)) {
+ if (gst_poll_fd_has_closed (sink->fdset, &client->fd)) {
client->status = GST_CLIENT_STATUS_CLOSED;
gst_multi_fd_sink_remove_client_link (sink, clients);
continue;
}
- if (gst_fdset_fd_has_error (sink->fdset, &client->fd)) {
- GST_WARNING_OBJECT (sink, "gst_fdset_fd_has_error for %d", client->fd.fd);
+ if (gst_poll_fd_has_error (sink->fdset, &client->fd)) {
+ GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->fd.fd);
client->status = GST_CLIENT_STATUS_ERROR;
gst_multi_fd_sink_remove_client_link (sink, clients);
continue;
}
- if (gst_fdset_fd_can_read (sink->fdset, &client->fd)) {
+ if (gst_poll_fd_can_read (sink->fdset, &client->fd)) {
/* handle client read */
if (!gst_multi_fd_sink_handle_client_read (sink, client)) {
gst_multi_fd_sink_remove_client_link (sink, clients);
continue;
}
}
- if (gst_fdset_fd_can_write (sink->fdset, &client->fd)) {
+ if (gst_poll_fd_can_write (sink->fdset, &client->fd)) {
/* handle client write */
if (!gst_multi_fd_sink_handle_client_write (sink, client)) {
gst_multi_fd_sink_remove_client_link (sink, clients);
gst_multi_fd_sink_start (GstBaseSink * bsink)
{
GstMultiFdSinkClass *fclass;
- int control_socket[2];
GstMultiFdSink *this;
if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN))
fclass = GST_MULTI_FD_SINK_GET_CLASS (this);
GST_INFO_OBJECT (this, "starting in mode %d", this->mode);
- this->fdset = gst_fdset_new (this->mode);
-
- if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0)
+ if ((this->fdset = gst_poll_new (this->mode, TRUE)) == NULL)
goto socket_pair;
- READ_SOCKET (this).fd = control_socket[0];
- WRITE_SOCKET (this).fd = control_socket[1];
-
- gst_fdset_add_fd (this->fdset, &READ_SOCKET (this));
- gst_fdset_fd_ctl_read (this->fdset, &READ_SOCKET (this), TRUE);
-
- fcntl (READ_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
- fcntl (WRITE_SOCKET (this).fd, F_SETFL, O_NONBLOCK);
-
this->streamheader = NULL;
this->bytes_to_serve = 0;
this->bytes_served = 0;
this->running = FALSE;
- SEND_COMMAND (this, CONTROL_STOP);
+ gst_poll_set_flushing (this->fdset, TRUE);
if (this->thread) {
GST_DEBUG_OBJECT (this, "joining thread");
g_thread_join (this->thread);
/* free the clients */
gst_multi_fd_sink_clear (this);
- close (READ_SOCKET (this).fd);
- close (WRITE_SOCKET (this).fd);
-
if (this->streamheader) {
g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (this->streamheader);
fclass->close (this);
if (this->fdset) {
- gst_fdset_remove_fd (this->fdset, &READ_SOCKET (this));
- gst_fdset_free (this->fdset);
+ gst_poll_free (this->fdset);
this->fdset = NULL;
}
g_hash_table_foreach_remove (this->fd_hash, multifdsink_hash_remove, this);
G_BEGIN_DECLS
#include "gsttcp.h"
-#include "gstfdset.h"
#define GST_TYPE_MULTI_FD_SINK \
(gst_multi_fd_sink_get_type())
/* structure for a client
*/
typedef struct {
- GstFD fd;
+ GstPollFD fd;
gint bufpos; /* position of this client in the global queue */
gint flushcount; /* the remaining number of buffers to flush out or -1 if the
GHashTable *fd_hash; /* index on fd to client */
guint clients_cookie; /* Cookie to detect changes to the clients list */
- GstFDSetMode mode;
- GstFDSet *fdset;
-
- GstFD control_sock[2];/* sockets for controlling the select call */
+ GstPollMode mode;
+ GstPoll *fdset;
GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
gboolean previous_buffer_in_caps;
/* vtable */
gboolean (*init) (GstMultiFdSink *sink);
- gboolean (*wait) (GstMultiFdSink *sink, GstFDSet *set);
+ gboolean (*wait) (GstMultiFdSink *sink, GstPoll *set);
gboolean (*close) (GstMultiFdSink *sink);
void (*removed) (GstMultiFdSink *sink, int fd);
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
-#include <string.h> /* memset, in FD_ZERO macro */
#include <unistd.h>
#include <sys/ioctl.h>
*/
static GstFlowReturn
gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count,
- int cancel_fd)
+ GstPoll * fdset)
{
- fd_set testfds;
- int maxfdp1;
ssize_t n;
size_t bytes_read;
int num_to_read;
+ int ret;
bytes_read = 0;
while (bytes_read < count) {
/* do a blocking select on the socket */
- FD_ZERO (&testfds);
- FD_SET (socket, &testfds);
- if (cancel_fd >= 0)
- FD_SET (cancel_fd, &testfds);
- maxfdp1 = MAX (socket, cancel_fd) + 1;
-
/* no action (0) is an error too in our case */
- if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0)
- goto select_error;
-
- if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
- goto cancelled;
+ if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) {
+ if (ret == -1 && errno == EBUSY)
+ goto cancelled;
+ else
+ goto select_error;
+ }
/* ask how much is available for reading on the socket */
if (ioctl (socket, FIONREAD, &num_to_read) < 0)
/* close the socket and reset the fd. Used to clean up after errors. */
void
-gst_tcp_socket_close (int *socket)
+gst_tcp_socket_close (GstPollFD * socket)
{
- close (*socket);
- *socket = -1;
+ if (socket->fd >= 0) {
+ close (socket->fd);
+ socket->fd = -1;
+ }
}
/* read a buffer from the given socket
* EOS
*/
GstFlowReturn
-gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd,
+gst_tcp_read_buffer (GstElement * this, int socket, GstPoll * fdset,
GstBuffer ** buf)
{
- fd_set testfds;
int ret;
- int maxfdp1;
ssize_t bytes_read;
int readsize;
*buf = NULL;
/* do a blocking select on the socket */
- FD_ZERO (&testfds);
- FD_SET (socket, &testfds);
- if (cancel_fd >= 0)
- FD_SET (cancel_fd, &testfds);
- maxfdp1 = MAX (socket, cancel_fd) + 1;
-
/* no action (0) is an error too in our case */
- if ((ret = select (maxfdp1, &testfds, NULL, NULL, 0)) <= 0)
- goto select_error;
-
- if (cancel_fd >= 0 && FD_ISSET (cancel_fd, &testfds))
- goto cancelled;
+ if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) {
+ if (ret == -1 && errno == EBUSY)
+ goto cancelled;
+ else
+ goto select_error;
+ }
/* ask how much is available for reading on the socket */
if ((ret = ioctl (socket, FIONREAD, &readsize)) < 0)
* EOS
*/
GstFlowReturn
-gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd,
+gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset,
GstBuffer ** buf)
{
GstFlowReturn ret;
*buf = NULL;
header = g_malloc (GST_DP_HEADER_LENGTH);
- ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
- cancel_fd);
+ ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset);
if (ret != GST_FLOW_OK)
goto header_read_error;
g_free (header);
ret = gst_tcp_socket_read (this, socket, GST_BUFFER_DATA (*buf),
- GST_BUFFER_SIZE (*buf), cancel_fd);
+ GST_BUFFER_SIZE (*buf), fdset);
if (ret != GST_FLOW_OK)
goto data_read_error;
}
GstFlowReturn
-gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd,
+gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset,
GstCaps ** caps)
{
GstFlowReturn ret;
*caps = NULL;
header = g_malloc (GST_DP_HEADER_LENGTH);
- ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH,
- cancel_fd);
+ ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset);
if (ret != GST_FLOW_OK)
goto header_read_error;
"Reading %" G_GSIZE_FORMAT " bytes for caps packet payload",
payload_length);
- ret = gst_tcp_socket_read (this, socket, payload, payload_length, cancel_fd);
+ ret = gst_tcp_socket_read (this, socket, payload, payload_length, fdset);
if (ret != GST_FLOW_OK)
goto payload_read_error;
gint gst_tcp_socket_write (int socket, const void *buf, size_t count);
-void gst_tcp_socket_close (int *socket);
+void gst_tcp_socket_close (GstPollFD *socket);
-GstFlowReturn gst_tcp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf);
+GstFlowReturn gst_tcp_read_buffer (GstElement * this, int socket, GstPoll * fdset, GstBuffer **buf);
-GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, int cancel_fd, GstBuffer **buf);
-GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, int cancel_fd, GstCaps **caps);
+GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset, GstBuffer **buf);
+GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset, GstCaps **caps);
-GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, int cancel_fd);
+GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, GstPoll * fdset);
gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port);
this->host = g_strdup (TCP_DEFAULT_HOST);
this->port = TCP_DEFAULT_PORT;
- this->sock_fd = -1;
+ this->sock_fd.fd = -1;
this->protocol = GST_TCP_PROTOCOL_NONE;
GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
}
GST_DEBUG_OBJECT (sink, "Sending caps %s through GDP", string);
g_free (string);
- if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps,
- TRUE, sink->host, sink->port))
+ if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd.fd,
+ caps, TRUE, sink->host, sink->port))
goto gdp_write_error;
sink->caps_sent = TRUE;
break;
case GST_TCP_PROTOCOL_GDP:
GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
- if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd, buf,
+ if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd.fd, buf,
TRUE, sink->host, sink->port))
goto gdp_write_error;
break;
}
/* write buffer data */
- wrote = gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf), size);
+ wrote = gst_tcp_socket_write (sink->sock_fd.fd, GST_BUFFER_DATA (buf), size);
if (wrote < size)
goto write_error;
/* create sending client socket */
GST_DEBUG_OBJECT (this, "opening sending client socket to %s:%d", this->host,
this->port);
- if ((this->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
+ if ((this->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
GST_DEBUG_OBJECT (this, "opened sending client socket with fd %d",
- this->sock_fd);
+ this->sock_fd.fd);
/* look up name if we need to */
ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
g_free (ip);
GST_DEBUG_OBJECT (this, "connecting to server");
- ret = connect (this->sock_fd, (struct sockaddr *) &this->server_sin,
+ ret = connect (this->sock_fd.fd, (struct sockaddr *) &this->server_sin,
sizeof (this->server_sin));
if (ret) {
if (!GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN))
return TRUE;
- if (this->sock_fd != -1) {
- close (this->sock_fd);
- this->sock_fd = -1;
- }
+ gst_tcp_socket_close (&this->sock_fd);
GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
struct sockaddr_in server_sin;
/* socket */
- int sock_fd;
+ GstPollFD sock_fd;
size_t data_written; /* how much bytes have we written ? */
GstTCPProtocol protocol; /* used with the protocol enum */
#include <fcntl.h>
-/* control stuff stolen from fdsrc */
-#define CONTROL_STOP 'S' /* stop the select call */
-#define CONTROL_SOCKETS(o) o->control_fds
-#define WRITE_SOCKET(o) o->control_fds[1]
-#define READ_SOCKET(o) o->control_fds[0]
-
-#define SEND_COMMAND(o, command) \
-G_STMT_START { \
- unsigned char c; c = command; \
- write (WRITE_SOCKET(o), &c, 1); \
-} G_STMT_END
-
-#define READ_COMMAND(o, command, res) \
-G_STMT_START { \
- res = read(READ_SOCKET(o), &command, 1); \
-} G_STMT_END
-
-
GST_DEBUG_CATEGORY_STATIC (tcpclientsrc_debug);
#define GST_CAT_DEFAULT tcpclientsrc_debug
{
this->port = TCP_DEFAULT_PORT;
this->host = g_strdup (TCP_DEFAULT_HOST);
- this->sock_fd = -1;
+ this->sock_fd.fd = -1;
this->protocol = GST_TCP_PROTOCOL_NONE;
this->caps = NULL;
- READ_SOCKET (this) = -1;
- WRITE_SOCKET (this) = -1;
-
gst_base_src_set_live (GST_BASE_SRC (this), TRUE);
GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SRC_OPEN);
/* read the buffer header if we're using a protocol */
switch (src->protocol) {
case GST_TCP_PROTOCOL_NONE:
- ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd,
- READ_SOCKET (src), outbuf);
+ ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd,
+ src->fdset, outbuf);
break;
case GST_TCP_PROTOCOL_GDP:
GstCaps *caps;
GST_DEBUG_OBJECT (src, "getting caps through GDP");
- ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd,
- READ_SOCKET (src), &caps);
+ ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd.fd,
+ src->fdset, &caps);
if (ret != GST_FLOW_OK)
goto no_caps;
src->caps = caps;
}
- ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd,
- READ_SOCKET (src), outbuf);
+ ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd,
+ src->fdset, outbuf);
break;
default:
/* need to assert as buf == NULL */
gchar *ip;
GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
- /* create the control sockets before anything */
- if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src)) < 0)
+ if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
goto socket_pair;
- fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
- fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
-
/* create receiving client socket */
GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
src->host, src->port);
- if ((src->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
+ if ((src->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
goto no_socket;
GST_DEBUG_OBJECT (src, "opened receiving client socket with fd %d",
- src->sock_fd);
+ src->sock_fd.fd);
GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN);
/* look up name if we need to */
g_free (ip);
GST_DEBUG_OBJECT (src, "connecting to server");
- ret = connect (src->sock_fd, (struct sockaddr *) &src->server_sin,
+ ret = connect (src->sock_fd.fd, (struct sockaddr *) &src->server_sin,
sizeof (src->server_sin));
if (ret) {
src = GST_TCP_CLIENT_SRC (bsrc);
GST_DEBUG_OBJECT (src, "closing socket");
- if (src->sock_fd != -1) {
- close (src->sock_fd);
- src->sock_fd = -1;
+
+ if (src->fdset != NULL) {
+ gst_poll_free (src->fdset);
+ src->fdset = NULL;
}
+
+ gst_tcp_socket_close (&src->sock_fd);
src->caps_received = FALSE;
if (src->caps) {
gst_caps_unref (src->caps);
}
GST_OBJECT_FLAG_UNSET (src, GST_TCP_CLIENT_SRC_OPEN);
- close (READ_SOCKET (src));
- close (WRITE_SOCKET (src));
- READ_SOCKET (src) = -1;
- WRITE_SOCKET (src) = -1;
-
return TRUE;
}
{
GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
- SEND_COMMAND (src, CONTROL_STOP);
+ gst_poll_set_flushing (src->fdset, TRUE);
return TRUE;
}
struct sockaddr_in server_sin;
/* socket */
- int sock_fd;
- int control_fds[2];
+ GstPollFD sock_fd;
+ GstPoll *fdset;
GstTCPProtocol protocol; /* protocol used for reading data */
gboolean caps_received; /* if we have received caps yet */
static void gst_tcp_server_sink_finalize (GObject * gobject);
static gboolean gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink,
- GstFDSet * set);
+ GstPoll * set);
static gboolean gst_tcp_server_sink_init_send (GstMultiFdSink * this);
static gboolean gst_tcp_server_sink_close (GstMultiFdSink * this);
static void gst_tcp_server_sink_removed (GstMultiFdSink * sink, int fd);
}
static gboolean
-gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, GstFDSet * set)
+gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, GstPoll * set)
{
GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink);
- if (gst_fdset_fd_can_read (set, &this->server_sock)) {
+ if (gst_poll_fd_can_read (set, &this->server_sock)) {
/* handle new client connection on server socket */
if (!gst_tcp_server_sink_handle_server_read (this)) {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
ret = 1;
if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_REUSEADDR,
(void *) &ret, sizeof (ret)) < 0) {
- gst_tcp_socket_close (&this->server_sock.fd);
+ gst_tcp_socket_close (&this->server_sock);
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
return FALSE;
ret = 1;
if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE,
(void *) &ret, sizeof (ret)) < 0) {
- gst_tcp_socket_close (&this->server_sock.fd);
+ gst_tcp_socket_close (&this->server_sock);
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
return FALSE;
sizeof (this->server_sin));
if (ret) {
- gst_tcp_socket_close (&this->server_sock.fd);
+ gst_tcp_socket_close (&this->server_sock);
switch (errno) {
default:
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
this->server_sock.fd, TCP_BACKLOG);
if (listen (this->server_sock.fd, TCP_BACKLOG) == -1) {
- gst_tcp_socket_close (&this->server_sock.fd);
+ gst_tcp_socket_close (&this->server_sock);
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("Could not listen on server socket: %s", g_strerror (errno)));
return FALSE;
"listened on server socket %d, returning from connection setup",
this->server_sock.fd);
- gst_fdset_add_fd (parent->fdset, &this->server_sock);
- gst_fdset_fd_ctl_read (parent->fdset, &this->server_sock, TRUE);
+ gst_poll_add_fd (parent->fdset, &this->server_sock);
+ gst_poll_fd_ctl_read (parent->fdset, &this->server_sock, TRUE);
return TRUE;
}
GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent);
if (this->server_sock.fd != -1) {
- gst_fdset_remove_fd (parent->fdset, &this->server_sock);
+ gst_poll_remove_fd (parent->fdset, &this->server_sock);
close (this->server_sock.fd);
this->server_sock.fd = -1;
struct sockaddr_in server_sin;
/* socket */
- GstFD server_sock;
+ GstPollFD server_sock;
};
struct _GstTCPServerSinkClass {
#include <fcntl.h>
-/* control stuff stolen from fdsrc */
-#define CONTROL_STOP 'S' /* stop the select call */
-#define CONTROL_SOCKETS(o) o->control_fds
-#define WRITE_SOCKET(o) o->control_fds[1]
-#define READ_SOCKET(o) o->control_fds[0]
-
-#define SEND_COMMAND(o, command) \
-G_STMT_START { \
- unsigned char c; c = command; \
- write (WRITE_SOCKET(o), &c, 1); \
-} G_STMT_END
-
-#define READ_COMMAND(o, command, res) \
-G_STMT_START { \
- res = read(READ_SOCKET(o), &command, 1); \
-} G_STMT_END
-
-
GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug);
#define GST_CAT_DEFAULT tcpserversrc_debug
{
src->server_port = TCP_DEFAULT_PORT;
src->host = g_strdup (TCP_DEFAULT_HOST);
- src->server_sock_fd = -1;
- src->client_sock_fd = -1;
+ src->server_sock_fd.fd = -1;
+ src->client_sock_fd.fd = -1;
src->protocol = GST_TCP_PROTOCOL_NONE;
- READ_SOCKET (src) = -1;
- WRITE_SOCKET (src) = -1;
-
GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
}
{
GstTCPServerSrc *src;
GstFlowReturn ret = GST_FLOW_OK;
- fd_set testfds;
- int maxfdp1;
src = GST_TCP_SERVER_SRC (psrc);
goto wrong_state;
restart:
- /* do a blocking select on the socket */
- FD_ZERO (&testfds);
-
- /* always select on cancel socket */
- FD_SET (READ_SOCKET (src), &testfds);
-
- if (src->client_sock_fd >= 0) {
+ if (src->client_sock_fd.fd >= 0) {
/* if we have a client, wait for read */
- FD_SET (src->client_sock_fd, &testfds);
- maxfdp1 = MAX (src->client_sock_fd, READ_SOCKET (src)) + 1;
+ gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, FALSE);
+ gst_poll_fd_ctl_read (src->fdset, &src->client_sock_fd, TRUE);
} else {
/* else wait on server socket for connections */
- FD_SET (src->server_sock_fd, &testfds);
- maxfdp1 = MAX (src->server_sock_fd, READ_SOCKET (src)) + 1;
+ gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, TRUE);
}
/* no action (0) is an error too in our case */
- if (select (maxfdp1, &testfds, NULL, NULL, 0) <= 0)
- goto select_error;
-
- if (FD_ISSET (READ_SOCKET (src), &testfds))
- goto select_cancelled;
+ if ((ret = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE)) <= 0) {
+ if (ret == -1 && errno == EBUSY)
+ goto select_cancelled;
+ else
+ goto select_error;
+ }
/* if we have no client socket we can accept one now */
- if (src->client_sock_fd < 0) {
- if (FD_ISSET (src->server_sock_fd, &testfds)) {
- if ((src->client_sock_fd =
- accept (src->server_sock_fd, (struct sockaddr *) &src->client_sin,
+ if (src->client_sock_fd.fd < 0) {
+ if (gst_poll_fd_can_read (src->fdset, &src->server_sock_fd)) {
+ if ((src->client_sock_fd.fd =
+ accept (src->server_sock_fd.fd,
+ (struct sockaddr *) &src->client_sin,
&src->client_sin_len)) == -1)
goto accept_error;
+
+ gst_poll_add_fd (src->fdset, &src->client_sock_fd);
}
/* and restart now to poll the socket. */
goto restart;
switch (src->protocol) {
case GST_TCP_PROTOCOL_NONE:
- ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd,
- READ_SOCKET (src), outbuf);
+ ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd,
+ src->fdset, outbuf);
break;
case GST_TCP_PROTOCOL_GDP:
GstCaps *caps;
gchar *string;
- ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd,
- READ_SOCKET (src), &caps);
+ ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd.fd,
+ src->fdset, &caps);
if (ret == GST_FLOW_WRONG_STATE)
goto gdp_cancelled;
gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps);
}
- ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd,
- READ_SOCKET (src), outbuf);
+ ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd,
+ src->fdset, outbuf);
if (ret == GST_FLOW_OK)
gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
int ret;
GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
- /* create the control sockets before anything */
- if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src)) < 0)
- goto socket_pair;
-
- fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
- fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
-
/* reset caps_received flag */
src->caps_received = FALSE;
/* create the server listener socket */
- if ((src->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
+ if ((src->server_sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
goto socket_error;
GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d",
- src->server_sock_fd);
+ src->server_sock_fd.fd);
/* make address reusable */
ret = 1;
- if (setsockopt (src->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
+ if (setsockopt (src->server_sock_fd.fd, SOL_SOCKET, SO_REUSEADDR, &ret,
sizeof (int)) < 0)
goto sock_opt;
/* bind it */
GST_DEBUG_OBJECT (src, "binding server socket to address");
- if ((ret = bind (src->server_sock_fd, (struct sockaddr *) &src->server_sin,
+ if ((ret = bind (src->server_sock_fd.fd, (struct sockaddr *) &src->server_sin,
sizeof (src->server_sin))) < 0)
goto bind_error;
GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d",
- src->server_sock_fd, TCP_BACKLOG);
+ src->server_sock_fd.fd, TCP_BACKLOG);
- if (listen (src->server_sock_fd, TCP_BACKLOG) == -1)
+ if (listen (src->server_sock_fd.fd, TCP_BACKLOG) == -1)
goto listen_error;
+ /* create an fdset to keep track of our file descriptors */
+ if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
+ goto socket_pair;
+
+ gst_poll_add_fd (src->fdset, &src->server_sock_fd);
+
GST_DEBUG_OBJECT (src, "received client");
GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN);
return TRUE;
/* ERRORS */
-socket_pair:
- {
- GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
- GST_ERROR_SYSTEM);
- return FALSE;
- }
socket_error:
{
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
{
GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
+ gst_tcp_socket_close (&src->server_sock_fd);
return FALSE;
}
host_error:
("Could not listen on server socket: %s", g_strerror (errno)));
return FALSE;
}
+socket_pair:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
+ GST_ERROR_SYSTEM);
+ gst_tcp_socket_close (&src->server_sock_fd);
+ return FALSE;
+ }
}
static gboolean
{
GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
- if (src->server_sock_fd != -1) {
- close (src->server_sock_fd);
- src->server_sock_fd = -1;
- }
- if (src->client_sock_fd != -1) {
- close (src->client_sock_fd);
- src->client_sock_fd = -1;
- }
- GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
+ gst_poll_free (src->fdset);
+ src->fdset = NULL;
- close (READ_SOCKET (src));
- close (WRITE_SOCKET (src));
- READ_SOCKET (src) = -1;
- WRITE_SOCKET (src) = -1;
+ gst_tcp_socket_close (&src->server_sock_fd);
+ gst_tcp_socket_close (&src->client_sock_fd);
+
+ GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
return TRUE;
}
{
GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
- SEND_COMMAND (src, CONTROL_STOP);
+ gst_poll_set_flushing (src->fdset, TRUE);
return TRUE;
}
int server_port;
gchar *host;
struct sockaddr_in server_sin;
- int server_sock_fd;
+ GstPollFD server_sock_fd;
/* client information */
struct sockaddr_in client_sin;
socklen_t client_sin_len;
- int client_sock_fd;
+ GstPollFD client_sock_fd;
- int control_fds[2];
+ GstPoll *fdset;
GstTCPProtocol protocol; /* protocol used for reading data */
gboolean caps_received; /* if we have received caps yet */