gst/tcp/: Added more locks around fdset structures. Fixed/reworked the poll array...
authorWim Taymans <wim.taymans@gmail.com>
Thu, 28 Oct 2004 14:22:15 +0000 (14:22 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Thu, 28 Oct 2004 14:22:15 +0000 (14:22 +0000)
Original commit message from CVS:
* gst/tcp/Makefile.am:
* gst/tcp/fdsetstress.c: (mess_some_more), (run_test), (main):
* gst/tcp/gstfdset.c: (nearest_pow), (resize), (ensure_size),
(gst_fdset_new), (gst_fdset_free), (gst_fdset_set_mode),
(gst_fdset_fd_ctl_write), (gst_fdset_fd_ctl_read),
(gst_fdset_fd_has_closed), (gst_fdset_fd_has_error),
(gst_fdset_fd_can_read), (gst_fdset_fd_can_write),
(gst_fdset_wait):
Added more locks around fdset structures. Fixed/reworked
the poll array resizing code.
Added stress test for fdset.

ChangeLog
gst/tcp/Makefile.am
gst/tcp/fdsetstress.c [new file with mode: 0644]
gst/tcp/gstfdset.c

index 7312e38..91d8aae 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,17 @@
+2004-10-28  Wim Taymans  <wim@fluendo.com>
+
+       * gst/tcp/Makefile.am:
+       * gst/tcp/fdsetstress.c: (mess_some_more), (run_test), (main):
+       * gst/tcp/gstfdset.c: (nearest_pow), (resize), (ensure_size),
+       (gst_fdset_new), (gst_fdset_free), (gst_fdset_set_mode),
+       (gst_fdset_fd_ctl_write), (gst_fdset_fd_ctl_read),
+       (gst_fdset_fd_has_closed), (gst_fdset_fd_has_error),
+       (gst_fdset_fd_can_read), (gst_fdset_fd_can_write),
+       (gst_fdset_wait):
+       Added more locks around fdset structures. Fixed/reworked
+       the poll array resizing code.
+       Added stress test for fdset.
+
 2004-10-28  Zaheer Abbas Merali  <zaheerabbas at merali dot org>
 
        * gst-libs/gst/audio/gstaudiofilter.c: (gst_audiofilter_link):
index 82ccdc2..1662ad2 100644 (file)
@@ -42,3 +42,9 @@ CLEANFILES = $(BUILT_SOURCES)
 
 EXTRA_DIST = gsttcp-marshal.list
 
+noinst_PROGRAMS = fdsetstress
+
+fdsetstress_SOURCES = fdsetstress.c gstfdset.c 
+fdsetstress_CFLAGS = $(GST_CFLAGS) $(GTK_CFLAGS)
+fdsetstress_LDFLAGS = $(GST_LIBS) $(GTK_LIBS)
+
diff --git a/gst/tcp/fdsetstress.c b/gst/tcp/fdsetstress.c
new file mode 100644 (file)
index 0000000..8bc13e3
--- /dev/null
@@ -0,0 +1,179 @@
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <stdlib.h>
+#include <gst/gst.h>
+
+#include "gstfdset.h"
+
+static GstFDSet *set;
+static GList *fds = NULL;
+static GMutex *fdlock;
+static GTimer *timer;
+
+#define MAX_THREADS  100
+
+static void
+mess_some_more ()
+{
+  GList *walk;
+  gint random;
+  gint removed = 0;
+
+  g_mutex_lock (fdlock);
+
+  for (walk = fds; walk;) {
+    GstFD *fd = (GstFD *) walk->data;
+
+    walk = g_list_next (walk);
+
+    random = (gint) (10.0 * rand () / (RAND_MAX + 1.0));
+    switch (random) {
+      case 0:
+      {
+        /*
+           GstFD *newfd = g_new0 (GstFD, 1);
+
+           gst_fdset_add_fd (set, newfd);
+           fds = g_list_prepend (fds, newfd);
+         */
+        break;
+      }
+      case 1:
+        if ((gint) (10.0 * rand () / (RAND_MAX + 1.0)) < 2) {
+          gst_fdset_remove_fd (set, fd);
+          fds = g_list_remove (fds, fd);
+          g_free (fd);
+          removed++;
+        }
+        break;
+
+      case 2:
+        gst_fdset_fd_ctl_write (set, fd, TRUE);
+        break;
+      case 3:
+        gst_fdset_fd_ctl_write (set, fd, FALSE);
+        break;
+      case 4:
+        gst_fdset_fd_ctl_read (set, fd, TRUE);
+        break;
+      case 5:
+        gst_fdset_fd_ctl_read (set, fd, FALSE);
+        break;
+
+      case 6:
+        gst_fdset_fd_has_closed (set, fd);
+        break;
+      case 7:
+        gst_fdset_fd_has_error (set, fd);
+        break;
+      case 8:
+        gst_fdset_fd_can_read (set, fd);
+        break;
+      case 9:
+        gst_fdset_fd_can_write (set, fd);
+        break;
+      default:
+        g_assert_not_reached ();
+        break;
+    }
+  }
+  if (g_list_length (fds) < 900) {
+    random = removed + (gint) (2.0 * rand () / (RAND_MAX + 1.0));
+    while (random) {
+      GstFD *newfd = g_new0 (GstFD, 1);
+
+      gst_fdset_add_fd (set, newfd);
+      fds = g_list_prepend (fds, newfd);
+      random--;
+    }
+  }
+
+  g_mutex_unlock (fdlock);
+}
+
+void *
+run_test (void *threadid)
+{
+  gint id = GPOINTER_TO_INT (threadid);
+
+  while (TRUE) {
+    if (id == 0) {
+      gint res = gst_fdset_wait (set, 10);
+
+      if (res < 0) {
+        g_print ("error %d %s\n", errno, g_strerror (errno));
+      }
+    } else {
+      mess_some_more ();
+      if (g_timer_elapsed (timer, NULL) > 0.5) {
+        g_mutex_lock (fdlock);
+        g_print ("active fds :%d\n", g_list_length (fds));
+        g_timer_start (timer);
+        g_mutex_unlock (fdlock);
+      }
+      g_usleep (1);
+    }
+  }
+
+  g_thread_exit (NULL);
+  return NULL;
+}
+
+gint
+main (gint argc, gchar * argv[])
+{
+  GThread *threads[MAX_THREADS];
+  gint num_threads;
+  gint t;
+
+  gst_init (&argc, &argv);
+
+  fdlock = g_mutex_new ();
+  timer = g_timer_new ();
+
+  if (argc != 2) {
+    g_print ("usage: %s <num_threads>\n", argv[0]);
+    exit (-1);
+  }
+
+  num_threads = atoi (argv[1]);
+
+  set = gst_fdset_new (GST_FDSET_MODE_POLL);
+
+  for (t = 0; t < num_threads; t++) {
+    GError *error = NULL;
+
+    threads[t] = g_thread_create (run_test, GINT_TO_POINTER (t), TRUE, &error);
+    if (error) {
+      printf ("ERROR: g_thread_create() %s\n", error->message);
+      exit (-1);
+    }
+  }
+  printf ("main(): Created %d threads.\n", t);
+
+  for (t = 0; t < num_threads; t++) {
+    g_thread_join (threads[t]);
+  }
+
+  gst_fdset_free (set);
+
+  return 0;
+}
index 64841a7..b75510d 100644 (file)
@@ -20,7 +20,7 @@
  * Boston, MA 02111-1307, USA.
  */
 
-#define MIN_POLLFDS    4096
+#define MIN_POLLFDS    32
 #define INIT_POLLFDS   MIN_POLLFDS
 
 #include <sys/poll.h>
@@ -72,26 +72,44 @@ struct _GstFDSet
 static gint
 nearest_pow (gint num)
 {
-  gint n = 1;
-
-  while (n < num)
-    n <<= 1;
+  /* hacker's delight page 48 */
+  num -= 1;
+  num |= num >> 1;
+  num |= num >> 2;
+  num |= num >> 4;
+  num |= num >> 8;
+  num |= num >> 16;
+
+  return num + 1;
+}
 
-  return n;
+/* resize a given pollfd array from old_size number of items
+ * to new_size number of items. Also initializes the new elements
+ * with the default values. */
+static struct pollfd *
+resize (struct pollfd *fds, gint old_size, gint new_size)
+{
+  struct pollfd *res;
+  gint i;
+
+  res = g_realloc (fds, new_size * sizeof (struct pollfd));
+  for (i = old_size; i < new_size; i++) {
+    res[i].fd = -1;
+    res[i].events = 0;
+    res[i].revents = 0;
+  }
+  return res;
 }
 
 static void
 ensure_size (GstFDSet * set, gint len)
 {
-  guint need = len * sizeof (struct pollfd);
-
-  if (need > set->size) {
-    need = nearest_pow (need);
-    need = MAX (need, MIN_POLLFDS * sizeof (struct pollfd));
+  if (len > set->size) {
+    len = nearest_pow (len);
+    len = MAX (len, MIN_POLLFDS);
 
-    set->pollfds = g_realloc (set->pollfds, need);
-
-    set->size = need;
+    set->pollfds = resize (set->pollfds, set->size, len);
+    set->size = len;
   }
 }
 
@@ -117,7 +135,7 @@ gst_fdset_new (GstFDSetMode mode)
       ensure_size (nset, MIN_POLLFDS);
       break;
     case GST_FDSET_MODE_EPOLL:
-      g_warning ("implement me");
+      g_warning ("implement EPOLL mode in GstFDSet");
       break;
     default:
       break;
@@ -138,7 +156,7 @@ gst_fdset_free (GstFDSet * set)
       g_mutex_free (set->poll_lock);
       break;
     case GST_FDSET_MODE_EPOLL:
-      g_warning ("implement me");
+      g_warning ("implement EPOLL mode in GstFDSet");
       break;
     default:
       break;
@@ -152,7 +170,7 @@ gst_fdset_set_mode (GstFDSet * set, GstFDSetMode mode)
 {
   g_return_if_fail (set != NULL);
 
-  g_warning ("implement me");
+  g_warning ("implement set_mode in GstFDSet");
 }
 
 GstFDSetMode
@@ -275,14 +293,22 @@ gst_fdset_fd_ctl_write (GstFDSet * set, GstFD * fd, gboolean active)
       break;
     case GST_FDSET_MODE_POLL:
     {
-      gint events = set->pollfds[fd->idx].events;
+      gint idx;
 
-      if (active)
-        events |= POLLOUT;
-      else
-        events &= ~POLLOUT;
+      g_mutex_lock (set->poll_lock);
+
+      idx = fd->idx;
+      if (idx >= 0) {
+        gint events = set->pollfds[idx].events;
 
-      set->pollfds[fd->idx].events = events;
+        if (active)
+          events |= POLLOUT;
+        else
+          events &= ~POLLOUT;
+
+        set->pollfds[idx].events = events;
+      }
+      g_mutex_unlock (set->poll_lock);
       break;
     }
     case GST_FDSET_MODE_EPOLL:
@@ -305,14 +331,22 @@ gst_fdset_fd_ctl_read (GstFDSet * set, GstFD * fd, gboolean active)
       break;
     case GST_FDSET_MODE_POLL:
     {
-      gint events = set->pollfds[fd->idx].events;
+      gint idx;
 
-      if (active)
-        events |= (POLLIN | POLLPRI);
-      else
-        events &= ~(POLLIN | POLLPRI);
+      g_mutex_lock (set->poll_lock);
+
+      idx = fd->idx;
+      if (idx >= 0) {
+        gint events = set->pollfds[idx].events;
 
-      set->pollfds[fd->idx].events = events;
+        if (active)
+          events |= (POLLIN | POLLPRI);
+        else
+          events &= ~(POLLIN | POLLPRI);
+
+        set->pollfds[idx].events = events;
+      }
+      g_mutex_unlock (set->poll_lock);
       break;
     }
     case GST_FDSET_MODE_EPOLL:
@@ -334,9 +368,11 @@ gst_fdset_fd_has_closed (GstFDSet * set, GstFD * fd)
       break;
     case GST_FDSET_MODE_POLL:
     {
-      gint idx = fd->idx;
+      gint idx;
 
       g_mutex_lock (set->poll_lock);
+      idx = fd->idx;
+
       if (idx >= 0 && idx < set->last_testpollfds) {
         res = (set->testpollfds[idx].revents & POLLHUP) != 0;
       }
@@ -363,9 +399,11 @@ gst_fdset_fd_has_error (GstFDSet * set, GstFD * fd)
       break;
     case GST_FDSET_MODE_POLL:
     {
-      gint idx = fd->idx;
+      gint idx;
 
       g_mutex_lock (set->poll_lock);
+      idx = fd->idx;
+
       if (idx >= 0 && idx < set->last_testpollfds) {
         res = (set->testpollfds[idx].revents & (POLLERR | POLLNVAL)) != 0;
       }
@@ -392,9 +430,11 @@ gst_fdset_fd_can_read (GstFDSet * set, GstFD * fd)
       break;
     case GST_FDSET_MODE_POLL:
     {
-      gint idx = fd->idx;
+      gint idx;
 
       g_mutex_lock (set->poll_lock);
+      idx = fd->idx;
+
       if (idx >= 0 && idx < set->last_testpollfds) {
         res = (set->testpollfds[idx].revents & (POLLIN | POLLPRI)) != 0;
       }
@@ -421,9 +461,11 @@ gst_fdset_fd_can_write (GstFDSet * set, GstFD * fd)
       break;
     case GST_FDSET_MODE_POLL:
     {
-      gint idx = fd->idx;
+      gint idx;
 
       g_mutex_lock (set->poll_lock);
+      idx = fd->idx;
+
       if (idx >= 0 && idx < set->last_testpollfds) {
         res = (set->testpollfds[idx].revents & POLLOUT) != 0;
       }
@@ -469,7 +511,7 @@ gst_fdset_wait (GstFDSet * set, int timeout)
     {
       g_mutex_lock (set->poll_lock);
       if (set->testsize != set->size) {
-        set->testpollfds = g_realloc (set->testpollfds, set->size);
+        set->testpollfds = resize (set->testpollfds, set->testsize, set->size);
         set->testsize = set->size;
       }
       set->last_testpollfds = set->last_pollfds;