+2004-10-28 Wim Taymans <wim@fluendo.com>
+
+ * gst/tcp/Makefile.am:
+ * gst/tcp/fdsetstress.c: (mess_some_more), (run_test), (main):
+ * gst/tcp/gstfdset.c: (nearest_pow), (resize), (ensure_size),
+ (gst_fdset_new), (gst_fdset_free), (gst_fdset_set_mode),
+ (gst_fdset_fd_ctl_write), (gst_fdset_fd_ctl_read),
+ (gst_fdset_fd_has_closed), (gst_fdset_fd_has_error),
+ (gst_fdset_fd_can_read), (gst_fdset_fd_can_write),
+ (gst_fdset_wait):
+ Added more locks around fdset structures. Fixed/reworked
+ the poll array resizing code.
+ Added stress test for fdset.
+
2004-10-28 Zaheer Abbas Merali <zaheerabbas at merali dot org>
* gst-libs/gst/audio/gstaudiofilter.c: (gst_audiofilter_link):
EXTRA_DIST = gsttcp-marshal.list
+noinst_PROGRAMS = fdsetstress
+
+fdsetstress_SOURCES = fdsetstress.c gstfdset.c
+fdsetstress_CFLAGS = $(GST_CFLAGS) $(GTK_CFLAGS)
+fdsetstress_LDFLAGS = $(GST_LIBS) $(GTK_LIBS)
+
--- /dev/null
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <stdlib.h>
+#include <gst/gst.h>
+
+#include "gstfdset.h"
+
+static GstFDSet *set;
+static GList *fds = NULL;
+static GMutex *fdlock;
+static GTimer *timer;
+
+#define MAX_THREADS 100
+
+static void
+mess_some_more ()
+{
+ GList *walk;
+ gint random;
+ gint removed = 0;
+
+ g_mutex_lock (fdlock);
+
+ for (walk = fds; walk;) {
+ GstFD *fd = (GstFD *) walk->data;
+
+ walk = g_list_next (walk);
+
+ random = (gint) (10.0 * rand () / (RAND_MAX + 1.0));
+ switch (random) {
+ case 0:
+ {
+ /*
+ GstFD *newfd = g_new0 (GstFD, 1);
+
+ gst_fdset_add_fd (set, newfd);
+ fds = g_list_prepend (fds, newfd);
+ */
+ break;
+ }
+ case 1:
+ if ((gint) (10.0 * rand () / (RAND_MAX + 1.0)) < 2) {
+ gst_fdset_remove_fd (set, fd);
+ fds = g_list_remove (fds, fd);
+ g_free (fd);
+ removed++;
+ }
+ break;
+
+ case 2:
+ gst_fdset_fd_ctl_write (set, fd, TRUE);
+ break;
+ case 3:
+ gst_fdset_fd_ctl_write (set, fd, FALSE);
+ break;
+ case 4:
+ gst_fdset_fd_ctl_read (set, fd, TRUE);
+ break;
+ case 5:
+ gst_fdset_fd_ctl_read (set, fd, FALSE);
+ break;
+
+ case 6:
+ gst_fdset_fd_has_closed (set, fd);
+ break;
+ case 7:
+ gst_fdset_fd_has_error (set, fd);
+ break;
+ case 8:
+ gst_fdset_fd_can_read (set, fd);
+ break;
+ case 9:
+ gst_fdset_fd_can_write (set, fd);
+ break;
+ default:
+ g_assert_not_reached ();
+ break;
+ }
+ }
+ if (g_list_length (fds) < 900) {
+ random = removed + (gint) (2.0 * rand () / (RAND_MAX + 1.0));
+ while (random) {
+ GstFD *newfd = g_new0 (GstFD, 1);
+
+ gst_fdset_add_fd (set, newfd);
+ fds = g_list_prepend (fds, newfd);
+ random--;
+ }
+ }
+
+ g_mutex_unlock (fdlock);
+}
+
+void *
+run_test (void *threadid)
+{
+ gint id = GPOINTER_TO_INT (threadid);
+
+ while (TRUE) {
+ if (id == 0) {
+ gint res = gst_fdset_wait (set, 10);
+
+ if (res < 0) {
+ g_print ("error %d %s\n", errno, g_strerror (errno));
+ }
+ } else {
+ mess_some_more ();
+ if (g_timer_elapsed (timer, NULL) > 0.5) {
+ g_mutex_lock (fdlock);
+ g_print ("active fds :%d\n", g_list_length (fds));
+ g_timer_start (timer);
+ g_mutex_unlock (fdlock);
+ }
+ g_usleep (1);
+ }
+ }
+
+ g_thread_exit (NULL);
+ return NULL;
+}
+
+gint
+main (gint argc, gchar * argv[])
+{
+ GThread *threads[MAX_THREADS];
+ gint num_threads;
+ gint t;
+
+ gst_init (&argc, &argv);
+
+ fdlock = g_mutex_new ();
+ timer = g_timer_new ();
+
+ if (argc != 2) {
+ g_print ("usage: %s <num_threads>\n", argv[0]);
+ exit (-1);
+ }
+
+ num_threads = atoi (argv[1]);
+
+ set = gst_fdset_new (GST_FDSET_MODE_POLL);
+
+ for (t = 0; t < num_threads; t++) {
+ GError *error = NULL;
+
+ threads[t] = g_thread_create (run_test, GINT_TO_POINTER (t), TRUE, &error);
+ if (error) {
+ printf ("ERROR: g_thread_create() %s\n", error->message);
+ exit (-1);
+ }
+ }
+ printf ("main(): Created %d threads.\n", t);
+
+ for (t = 0; t < num_threads; t++) {
+ g_thread_join (threads[t]);
+ }
+
+ gst_fdset_free (set);
+
+ return 0;
+}
* Boston, MA 02111-1307, USA.
*/
-#define MIN_POLLFDS 4096
+#define MIN_POLLFDS 32
#define INIT_POLLFDS MIN_POLLFDS
#include <sys/poll.h>
static gint
nearest_pow (gint num)
{
- gint n = 1;
-
- while (n < num)
- n <<= 1;
+ /* hacker's delight page 48 */
+ num -= 1;
+ num |= num >> 1;
+ num |= num >> 2;
+ num |= num >> 4;
+ num |= num >> 8;
+ num |= num >> 16;
+
+ return num + 1;
+}
- return n;
+/* resize a given pollfd array from old_size number of items
+ * to new_size number of items. Also initializes the new elements
+ * with the default values. */
+static struct pollfd *
+resize (struct pollfd *fds, gint old_size, gint new_size)
+{
+ struct pollfd *res;
+ gint i;
+
+ res = g_realloc (fds, new_size * sizeof (struct pollfd));
+ for (i = old_size; i < new_size; i++) {
+ res[i].fd = -1;
+ res[i].events = 0;
+ res[i].revents = 0;
+ }
+ return res;
}
static void
ensure_size (GstFDSet * set, gint len)
{
- guint need = len * sizeof (struct pollfd);
-
- if (need > set->size) {
- need = nearest_pow (need);
- need = MAX (need, MIN_POLLFDS * sizeof (struct pollfd));
+ if (len > set->size) {
+ len = nearest_pow (len);
+ len = MAX (len, MIN_POLLFDS);
- set->pollfds = g_realloc (set->pollfds, need);
-
- set->size = need;
+ set->pollfds = resize (set->pollfds, set->size, len);
+ set->size = len;
}
}
ensure_size (nset, MIN_POLLFDS);
break;
case GST_FDSET_MODE_EPOLL:
- g_warning ("implement me");
+ g_warning ("implement EPOLL mode in GstFDSet");
break;
default:
break;
g_mutex_free (set->poll_lock);
break;
case GST_FDSET_MODE_EPOLL:
- g_warning ("implement me");
+ g_warning ("implement EPOLL mode in GstFDSet");
break;
default:
break;
{
g_return_if_fail (set != NULL);
- g_warning ("implement me");
+ g_warning ("implement set_mode in GstFDSet");
}
GstFDSetMode
break;
case GST_FDSET_MODE_POLL:
{
- gint events = set->pollfds[fd->idx].events;
+ gint idx;
- if (active)
- events |= POLLOUT;
- else
- events &= ~POLLOUT;
+ g_mutex_lock (set->poll_lock);
+
+ idx = fd->idx;
+ if (idx >= 0) {
+ gint events = set->pollfds[idx].events;
- set->pollfds[fd->idx].events = events;
+ if (active)
+ events |= POLLOUT;
+ else
+ events &= ~POLLOUT;
+
+ set->pollfds[idx].events = events;
+ }
+ g_mutex_unlock (set->poll_lock);
break;
}
case GST_FDSET_MODE_EPOLL:
break;
case GST_FDSET_MODE_POLL:
{
- gint events = set->pollfds[fd->idx].events;
+ gint idx;
- if (active)
- events |= (POLLIN | POLLPRI);
- else
- events &= ~(POLLIN | POLLPRI);
+ g_mutex_lock (set->poll_lock);
+
+ idx = fd->idx;
+ if (idx >= 0) {
+ gint events = set->pollfds[idx].events;
- set->pollfds[fd->idx].events = events;
+ if (active)
+ events |= (POLLIN | POLLPRI);
+ else
+ events &= ~(POLLIN | POLLPRI);
+
+ set->pollfds[idx].events = events;
+ }
+ g_mutex_unlock (set->poll_lock);
break;
}
case GST_FDSET_MODE_EPOLL:
break;
case GST_FDSET_MODE_POLL:
{
- gint idx = fd->idx;
+ gint idx;
g_mutex_lock (set->poll_lock);
+ idx = fd->idx;
+
if (idx >= 0 && idx < set->last_testpollfds) {
res = (set->testpollfds[idx].revents & POLLHUP) != 0;
}
break;
case GST_FDSET_MODE_POLL:
{
- gint idx = fd->idx;
+ gint idx;
g_mutex_lock (set->poll_lock);
+ idx = fd->idx;
+
if (idx >= 0 && idx < set->last_testpollfds) {
res = (set->testpollfds[idx].revents & (POLLERR | POLLNVAL)) != 0;
}
break;
case GST_FDSET_MODE_POLL:
{
- gint idx = fd->idx;
+ gint idx;
g_mutex_lock (set->poll_lock);
+ idx = fd->idx;
+
if (idx >= 0 && idx < set->last_testpollfds) {
res = (set->testpollfds[idx].revents & (POLLIN | POLLPRI)) != 0;
}
break;
case GST_FDSET_MODE_POLL:
{
- gint idx = fd->idx;
+ gint idx;
g_mutex_lock (set->poll_lock);
+ idx = fd->idx;
+
if (idx >= 0 && idx < set->last_testpollfds) {
res = (set->testpollfds[idx].revents & POLLOUT) != 0;
}
{
g_mutex_lock (set->poll_lock);
if (set->testsize != set->size) {
- set->testpollfds = g_realloc (set->testpollfds, set->size);
+ set->testpollfds = resize (set->testpollfds, set->testsize, set->size);
set->testsize = set->size;
}
set->last_testpollfds = set->last_pollfds;