From 2bb3268dd58b8de08c8cfedf979d7bbf85d74dfd Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 28 Oct 2004 14:22:15 +0000 Subject: [PATCH] gst/tcp/: Added more locks around fdset structures. Fixed/reworked the poll array resizing code. Original commit message from CVS: * gst/tcp/Makefile.am: * gst/tcp/fdsetstress.c: (mess_some_more), (run_test), (main): * gst/tcp/gstfdset.c: (nearest_pow), (resize), (ensure_size), (gst_fdset_new), (gst_fdset_free), (gst_fdset_set_mode), (gst_fdset_fd_ctl_write), (gst_fdset_fd_ctl_read), (gst_fdset_fd_has_closed), (gst_fdset_fd_has_error), (gst_fdset_fd_can_read), (gst_fdset_fd_can_write), (gst_fdset_wait): Added more locks around fdset structures. Fixed/reworked the poll array resizing code. Added stress test for fdset. --- ChangeLog | 14 ++++ gst/tcp/Makefile.am | 6 ++ gst/tcp/fdsetstress.c | 179 ++++++++++++++++++++++++++++++++++++++++++++++++++ gst/tcp/gstfdset.c | 110 +++++++++++++++++++++---------- 4 files changed, 275 insertions(+), 34 deletions(-) create mode 100644 gst/tcp/fdsetstress.c diff --git a/ChangeLog b/ChangeLog index 7312e38..91d8aae 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,17 @@ +2004-10-28 Wim Taymans + + * gst/tcp/Makefile.am: + * gst/tcp/fdsetstress.c: (mess_some_more), (run_test), (main): + * gst/tcp/gstfdset.c: (nearest_pow), (resize), (ensure_size), + (gst_fdset_new), (gst_fdset_free), (gst_fdset_set_mode), + (gst_fdset_fd_ctl_write), (gst_fdset_fd_ctl_read), + (gst_fdset_fd_has_closed), (gst_fdset_fd_has_error), + (gst_fdset_fd_can_read), (gst_fdset_fd_can_write), + (gst_fdset_wait): + Added more locks around fdset structures. Fixed/reworked + the poll array resizing code. + Added stress test for fdset. + 2004-10-28 Zaheer Abbas Merali * gst-libs/gst/audio/gstaudiofilter.c: (gst_audiofilter_link): diff --git a/gst/tcp/Makefile.am b/gst/tcp/Makefile.am index 82ccdc2..1662ad2 100644 --- a/gst/tcp/Makefile.am +++ b/gst/tcp/Makefile.am @@ -42,3 +42,9 @@ CLEANFILES = $(BUILT_SOURCES) EXTRA_DIST = gsttcp-marshal.list +noinst_PROGRAMS = fdsetstress + +fdsetstress_SOURCES = fdsetstress.c gstfdset.c +fdsetstress_CFLAGS = $(GST_CFLAGS) $(GTK_CFLAGS) +fdsetstress_LDFLAGS = $(GST_LIBS) $(GTK_LIBS) + diff --git a/gst/tcp/fdsetstress.c b/gst/tcp/fdsetstress.c new file mode 100644 index 0000000..8bc13e3 --- /dev/null +++ b/gst/tcp/fdsetstress.c @@ -0,0 +1,179 @@ +/* GStreamer + * Copyright (C) <1999> Erik Walthinsen + * Copyright (C) <2004> Thomas Vander Stichele + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include +#include + +#include "gstfdset.h" + +static GstFDSet *set; +static GList *fds = NULL; +static GMutex *fdlock; +static GTimer *timer; + +#define MAX_THREADS 100 + +static void +mess_some_more () +{ + GList *walk; + gint random; + gint removed = 0; + + g_mutex_lock (fdlock); + + for (walk = fds; walk;) { + GstFD *fd = (GstFD *) walk->data; + + walk = g_list_next (walk); + + random = (gint) (10.0 * rand () / (RAND_MAX + 1.0)); + switch (random) { + case 0: + { + /* + GstFD *newfd = g_new0 (GstFD, 1); + + gst_fdset_add_fd (set, newfd); + fds = g_list_prepend (fds, newfd); + */ + break; + } + case 1: + if ((gint) (10.0 * rand () / (RAND_MAX + 1.0)) < 2) { + gst_fdset_remove_fd (set, fd); + fds = g_list_remove (fds, fd); + g_free (fd); + removed++; + } + break; + + case 2: + gst_fdset_fd_ctl_write (set, fd, TRUE); + break; + case 3: + gst_fdset_fd_ctl_write (set, fd, FALSE); + break; + case 4: + gst_fdset_fd_ctl_read (set, fd, TRUE); + break; + case 5: + gst_fdset_fd_ctl_read (set, fd, FALSE); + break; + + case 6: + gst_fdset_fd_has_closed (set, fd); + break; + case 7: + gst_fdset_fd_has_error (set, fd); + break; + case 8: + gst_fdset_fd_can_read (set, fd); + break; + case 9: + gst_fdset_fd_can_write (set, fd); + break; + default: + g_assert_not_reached (); + break; + } + } + if (g_list_length (fds) < 900) { + random = removed + (gint) (2.0 * rand () / (RAND_MAX + 1.0)); + while (random) { + GstFD *newfd = g_new0 (GstFD, 1); + + gst_fdset_add_fd (set, newfd); + fds = g_list_prepend (fds, newfd); + random--; + } + } + + g_mutex_unlock (fdlock); +} + +void * +run_test (void *threadid) +{ + gint id = GPOINTER_TO_INT (threadid); + + while (TRUE) { + if (id == 0) { + gint res = gst_fdset_wait (set, 10); + + if (res < 0) { + g_print ("error %d %s\n", errno, g_strerror (errno)); + } + } else { + mess_some_more (); + if (g_timer_elapsed (timer, NULL) > 0.5) { + g_mutex_lock (fdlock); + g_print ("active fds :%d\n", g_list_length (fds)); + g_timer_start (timer); + g_mutex_unlock (fdlock); + } + g_usleep (1); + } + } + + g_thread_exit (NULL); + return NULL; +} + +gint +main (gint argc, gchar * argv[]) +{ + GThread *threads[MAX_THREADS]; + gint num_threads; + gint t; + + gst_init (&argc, &argv); + + fdlock = g_mutex_new (); + timer = g_timer_new (); + + if (argc != 2) { + g_print ("usage: %s \n", argv[0]); + exit (-1); + } + + num_threads = atoi (argv[1]); + + set = gst_fdset_new (GST_FDSET_MODE_POLL); + + for (t = 0; t < num_threads; t++) { + GError *error = NULL; + + threads[t] = g_thread_create (run_test, GINT_TO_POINTER (t), TRUE, &error); + if (error) { + printf ("ERROR: g_thread_create() %s\n", error->message); + exit (-1); + } + } + printf ("main(): Created %d threads.\n", t); + + for (t = 0; t < num_threads; t++) { + g_thread_join (threads[t]); + } + + gst_fdset_free (set); + + return 0; +} diff --git a/gst/tcp/gstfdset.c b/gst/tcp/gstfdset.c index 64841a7..b75510d 100644 --- a/gst/tcp/gstfdset.c +++ b/gst/tcp/gstfdset.c @@ -20,7 +20,7 @@ * Boston, MA 02111-1307, USA. */ -#define MIN_POLLFDS 4096 +#define MIN_POLLFDS 32 #define INIT_POLLFDS MIN_POLLFDS #include @@ -72,26 +72,44 @@ struct _GstFDSet static gint nearest_pow (gint num) { - gint n = 1; - - while (n < num) - n <<= 1; + /* hacker's delight page 48 */ + num -= 1; + num |= num >> 1; + num |= num >> 2; + num |= num >> 4; + num |= num >> 8; + num |= num >> 16; + + return num + 1; +} - return n; +/* resize a given pollfd array from old_size number of items + * to new_size number of items. Also initializes the new elements + * with the default values. */ +static struct pollfd * +resize (struct pollfd *fds, gint old_size, gint new_size) +{ + struct pollfd *res; + gint i; + + res = g_realloc (fds, new_size * sizeof (struct pollfd)); + for (i = old_size; i < new_size; i++) { + res[i].fd = -1; + res[i].events = 0; + res[i].revents = 0; + } + return res; } static void ensure_size (GstFDSet * set, gint len) { - guint need = len * sizeof (struct pollfd); - - if (need > set->size) { - need = nearest_pow (need); - need = MAX (need, MIN_POLLFDS * sizeof (struct pollfd)); + if (len > set->size) { + len = nearest_pow (len); + len = MAX (len, MIN_POLLFDS); - set->pollfds = g_realloc (set->pollfds, need); - - set->size = need; + set->pollfds = resize (set->pollfds, set->size, len); + set->size = len; } } @@ -117,7 +135,7 @@ gst_fdset_new (GstFDSetMode mode) ensure_size (nset, MIN_POLLFDS); break; case GST_FDSET_MODE_EPOLL: - g_warning ("implement me"); + g_warning ("implement EPOLL mode in GstFDSet"); break; default: break; @@ -138,7 +156,7 @@ gst_fdset_free (GstFDSet * set) g_mutex_free (set->poll_lock); break; case GST_FDSET_MODE_EPOLL: - g_warning ("implement me"); + g_warning ("implement EPOLL mode in GstFDSet"); break; default: break; @@ -152,7 +170,7 @@ gst_fdset_set_mode (GstFDSet * set, GstFDSetMode mode) { g_return_if_fail (set != NULL); - g_warning ("implement me"); + g_warning ("implement set_mode in GstFDSet"); } GstFDSetMode @@ -275,14 +293,22 @@ gst_fdset_fd_ctl_write (GstFDSet * set, GstFD * fd, gboolean active) break; case GST_FDSET_MODE_POLL: { - gint events = set->pollfds[fd->idx].events; + gint idx; - if (active) - events |= POLLOUT; - else - events &= ~POLLOUT; + g_mutex_lock (set->poll_lock); + + idx = fd->idx; + if (idx >= 0) { + gint events = set->pollfds[idx].events; - set->pollfds[fd->idx].events = events; + if (active) + events |= POLLOUT; + else + events &= ~POLLOUT; + + set->pollfds[idx].events = events; + } + g_mutex_unlock (set->poll_lock); break; } case GST_FDSET_MODE_EPOLL: @@ -305,14 +331,22 @@ gst_fdset_fd_ctl_read (GstFDSet * set, GstFD * fd, gboolean active) break; case GST_FDSET_MODE_POLL: { - gint events = set->pollfds[fd->idx].events; + gint idx; - if (active) - events |= (POLLIN | POLLPRI); - else - events &= ~(POLLIN | POLLPRI); + g_mutex_lock (set->poll_lock); + + idx = fd->idx; + if (idx >= 0) { + gint events = set->pollfds[idx].events; - set->pollfds[fd->idx].events = events; + if (active) + events |= (POLLIN | POLLPRI); + else + events &= ~(POLLIN | POLLPRI); + + set->pollfds[idx].events = events; + } + g_mutex_unlock (set->poll_lock); break; } case GST_FDSET_MODE_EPOLL: @@ -334,9 +368,11 @@ gst_fdset_fd_has_closed (GstFDSet * set, GstFD * fd) break; case GST_FDSET_MODE_POLL: { - gint idx = fd->idx; + gint idx; g_mutex_lock (set->poll_lock); + idx = fd->idx; + if (idx >= 0 && idx < set->last_testpollfds) { res = (set->testpollfds[idx].revents & POLLHUP) != 0; } @@ -363,9 +399,11 @@ gst_fdset_fd_has_error (GstFDSet * set, GstFD * fd) break; case GST_FDSET_MODE_POLL: { - gint idx = fd->idx; + gint idx; g_mutex_lock (set->poll_lock); + idx = fd->idx; + if (idx >= 0 && idx < set->last_testpollfds) { res = (set->testpollfds[idx].revents & (POLLERR | POLLNVAL)) != 0; } @@ -392,9 +430,11 @@ gst_fdset_fd_can_read (GstFDSet * set, GstFD * fd) break; case GST_FDSET_MODE_POLL: { - gint idx = fd->idx; + gint idx; g_mutex_lock (set->poll_lock); + idx = fd->idx; + if (idx >= 0 && idx < set->last_testpollfds) { res = (set->testpollfds[idx].revents & (POLLIN | POLLPRI)) != 0; } @@ -421,9 +461,11 @@ gst_fdset_fd_can_write (GstFDSet * set, GstFD * fd) break; case GST_FDSET_MODE_POLL: { - gint idx = fd->idx; + gint idx; g_mutex_lock (set->poll_lock); + idx = fd->idx; + if (idx >= 0 && idx < set->last_testpollfds) { res = (set->testpollfds[idx].revents & POLLOUT) != 0; } @@ -469,7 +511,7 @@ gst_fdset_wait (GstFDSet * set, int timeout) { g_mutex_lock (set->poll_lock); if (set->testsize != set->size) { - set->testpollfds = g_realloc (set->testpollfds, set->size); + set->testpollfds = resize (set->testpollfds, set->testsize, set->size); set->testsize = set->size; } set->last_testpollfds = set->last_pollfds; -- 2.7.4