2004-11-23 Havoc Pennington <hp@redhat.com>
authorHavoc Pennington <hp@redhat.com>
Tue, 23 Nov 2004 06:21:12 +0000 (06:21 +0000)
committerHavoc Pennington <hp@redhat.com>
Tue, 23 Nov 2004 06:21:12 +0000 (06:21 +0000)
* test/glib/test-profile.c: modify to accept a plain_sockets
argument in which case it will bench plain sockets instead of
libdbus, for comparison purposes.

ChangeLog
test/glib/test-profile.c

index de52566..f85dd6b 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,9 @@
+2004-11-23  Havoc Pennington  <hp@redhat.com>
+
+       * test/glib/test-profile.c: modify to accept a plain_sockets
+       argument in which case it will bench plain sockets instead of
+       libdbus, for comparison purposes.
+
 2004-11-22  Havoc Pennington  <hp@redhat.com>
 
        * test/glib/test-profile.c (N_CLIENT_THREADS): run multiple
index a9c502d..2b7cb5b 100644 (file)
 #include <glib.h>
 #include <dbus/dbus-glib-lowlevel.h>
 #include <stdlib.h>
+#include <unistd.h>
 
-#define N_CLIENT_THREADS 4
-#define N_ITERATIONS 40000
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/stat.h>
+#ifndef HAVE_SOCKLEN_T
+#define socklen_t int
+#endif
+
+#define _DBUS_ZERO(object) (memset (&(object), '\0', sizeof ((object))))
+#define _DBUS_MAX_SUN_PATH_LENGTH 99
+
+/* Note that if you set threads > 1 you get a bogus profile since the
+ * clients start blocking on the server, so the client write() will go
+ * higher in the profile the larger the number of threads.
+ */
+#define N_CLIENT_THREADS 1
+#define N_ITERATIONS 150000
 #define PAYLOAD_SIZE 30
 #define ECHO_PATH "/org/freedesktop/EchoTest"
 #define ECHO_INTERFACE "org.freedesktop.EchoTest"
 #define ECHO_METHOD "EchoProfile"
 
-static const char *address;
+static const char *with_bus_address;
+static const char *plain_sockets_address;
 static unsigned char *payload;
+static int echo_call_size;
+static int echo_return_size;
 
 typedef struct
 {
@@ -49,6 +72,20 @@ typedef struct
   int n_clients;
 } ServerData;
 
+typedef struct
+{
+  const char *name;
+  void* (* init_server)        (ServerData *sd);
+  void  (* stop_server)        (ServerData *sd,
+                                void       *server);
+  void* (* client_thread_func) (void *data);
+
+  /* this is so different runs show up in the profiler with
+   * different backtrace
+   */
+  void  (* main_loop_run_func) (GMainLoop *loop);
+} ProfileRunVTable;
+
 static void
 send_echo_method_call (DBusConnection *connection)
 {
@@ -113,17 +150,17 @@ client_filter (DBusConnection     *connection,
 }
 
 static void*
-thread_func (void *data)
+with_bus_thread_func (void *data)
 {
   DBusError error;
   GMainContext *context;
   DBusConnection *connection;
   ClientData cd;
   
-  g_printerr ("Starting client thread\n");
+  g_printerr ("Starting client thread %p\n", g_thread_self());  
   
   dbus_error_init (&error);
-  connection = dbus_connection_open (address, &error);
+  connection = dbus_connection_open (with_bus_address, &error);
   if (connection == NULL)
     {
       g_printerr ("could not open connection: %s\n", error.message);
@@ -149,7 +186,8 @@ thread_func (void *data)
 
   g_printerr ("Client thread entering main loop\n");
   g_main_loop_run (cd.loop);
-  g_printerr ("Client thread exiting main loop\n");
+  g_printerr ("Client thread %p exiting main loop\n",
+              g_thread_self());
 
   dbus_connection_disconnect (connection);
   
@@ -206,18 +244,15 @@ new_connection_callback (DBusServer     *server,
   /* FIXME we leak the handler */  
 }
 
-int
-main (int argc, char *argv[])
+static void*
+with_bus_init_server (ServerData *sd)
 {
-  DBusError error;
   DBusServer *server;
-  GTimer *timer;
-  int i;
-  double secs;
-  ServerData sd;
+  DBusError error;
 
-  g_thread_init (NULL);
-  dbus_g_thread_init ();
+#ifndef DBUS_DISABLE_ASSERT
+  g_printerr ("You should probably --disable-asserts before you profile as they have noticeable overhead\n");
+#endif
 
   dbus_error_init (&error);
   server = dbus_server_listen ("unix:tmpdir="DBUS_TEST_SOCKET_DIR,
@@ -226,50 +261,491 @@ main (int argc, char *argv[])
     {
       g_printerr ("Could not start server: %s\n",
                   error.message);
-      return 1;
+      exit (1);
     }
 
-#ifndef DBUS_DISABLE_ASSERT
-  g_printerr ("You should probably turn off assertions before you profile\n");
-#endif
-  
-  address = dbus_server_get_address (server);
-  payload = g_malloc (PAYLOAD_SIZE);
+  with_bus_address = dbus_server_get_address (server);
   
   dbus_server_set_new_connection_function (server,
                                            new_connection_callback,
-                                           &sd, NULL);
+                                           sd, NULL);
+
+  dbus_server_setup_with_g_main (server, NULL);
+  
+  return server;
+}
+
+static void
+with_bus_stop_server (ServerData *sd,
+                      void       *server)
+{
+  g_printerr ("The following g_warning is because we try to call g_source_remove_poll() after g_source_destroy() in dbus-gmain.c, I think we need to add a source free func that clears out the watch/timeout funcs\n");
+  
+  dbus_server_unref (server);
+}
+
+static void
+with_bus_main_loop_run (GMainLoop *loop)
+{
+  g_main_loop_run (loop);
+}
+
+static const ProfileRunVTable with_bus_vtable = {
+  "with bus",
+  with_bus_init_server,
+  with_bus_stop_server,
+  with_bus_thread_func,
+  with_bus_main_loop_run
+};
+
+typedef struct
+{
+  int listen_fd;
+  ServerData *sd;
+  unsigned int source_id;
+} PlainSocketServer;
+
+static void
+read_and_drop_on_floor (int fd,
+                        int count)
+{
+  int bytes_read;
+  int val;
+  char buf[512];
+
+  bytes_read = 0;
+
+  while (bytes_read < count)
+    {
+    again:
+      
+      val = read (fd, buf, MIN (count - bytes_read, (int) sizeof(buf)));
+      
+      if (val < 0)
+        {
+          if (errno == EINTR)
+            goto again;
+          else
+            {
+              g_printerr ("read() failed thread %p: %s\n",
+                          g_thread_self(), strerror (errno));
+              exit (1);
+            }
+        }
+      else
+        {
+          bytes_read += val;
+        }
+    }
+
+#if 0
+  g_print ("%p read %d bytes from fd %d\n",
+           g_thread_self(), bytes_read, fd);
+#endif
+}
+
+static void
+write_junk (int fd,
+            int count)
+{
+  int bytes_written;
+  int val;
+  char buf[512];
+
+  bytes_written = 0;
+  
+  while (bytes_written < count)
+    {
+    again:
+      
+      val = write (fd, buf, MIN (count - bytes_written, (int) sizeof(buf)));
+      
+      if (val < 0)
+        {
+          if (errno == EINTR)
+            goto again;
+          else
+            {
+              g_printerr ("write() failed thread %p: %s\n",
+                          g_thread_self(), strerror (errno));
+              exit (1);
+            }
+        }
+      else
+        {
+          bytes_written += val;
+        }
+    }
+
+#if 0
+  g_print ("%p wrote %d bytes to fd %d\n",
+           g_thread_self(), bytes_written, fd);
+#endif
+}
+
+static gboolean
+plain_sockets_talk_to_client_watch (GIOChannel   *source,
+                                    GIOCondition  condition,
+                                    gpointer      data)
+{
+  PlainSocketServer *server = data;
+  int client_fd = g_io_channel_unix_get_fd (source);
+  
+  if (condition & G_IO_HUP)
+    {
+      g_printerr ("Client disconnected from server\n");
+      server->sd->n_clients -= 1;
+      if (server->sd->n_clients == 0)
+        g_main_loop_quit (server->sd->loop);
+
+      return FALSE; /* remove watch */
+    }
+  else if (condition & G_IO_IN)
+    {
+      server->sd->handled += 1;
+
+      read_and_drop_on_floor (client_fd, echo_call_size);
+      write_junk (client_fd, echo_return_size);
+    }
+  else
+    {
+      g_printerr ("Unexpected IO condition in server thread\n");
+      exit (1);
+    }
+
+  return TRUE;
+}
+
+static gboolean
+plain_sockets_new_client_watch (GIOChannel   *source,
+                                GIOCondition  condition,
+                                gpointer      data)
+{
+  int client_fd;
+  struct sockaddr addr;
+  socklen_t addrlen;
+  GIOChannel *channel;
+  PlainSocketServer *server = data;
+
+  if (!(condition & G_IO_IN))
+    {
+      g_printerr ("Unexpected IO condition on server socket\n");
+      exit (1);
+    }
+  
+  addrlen = sizeof (addr);
+  
+ retry:
+  client_fd = accept (server->listen_fd, &addr, &addrlen);
+  
+  if (client_fd < 0)
+    {
+      if (errno == EINTR)
+        goto retry;
+      else
+        {
+          g_printerr ("Failed to accept() connection from client: %s\n",
+                      strerror (errno));
+          exit (1);
+        }
+    }
+  
+  channel = g_io_channel_unix_new (client_fd);
+  g_io_add_watch (channel,
+                  G_IO_IN | G_IO_ERR | G_IO_HUP | G_IO_NVAL | G_IO_PRI,
+                  plain_sockets_talk_to_client_watch,
+                  server);
+  g_io_channel_unref (channel);
+
+  server->sd->n_clients += 1;
+  
+  return TRUE;
+}
+
+static void*
+plain_sockets_init_server (ServerData *sd)
+{
+  PlainSocketServer *server;
+  struct sockaddr_un addr;
+  static char path[] = "/tmp/dbus-test-profile-XXXXXX";
+  char *p;
+  GIOChannel *channel;
+
+  server = g_new0 (PlainSocketServer, 1);
+  server->sd = sd;
+  
+  p = path;
+  while (*p)
+    {
+      if (*p == 'X')
+        *p = 'a' + (int) (26.0*rand()/(RAND_MAX+1.0));
+      ++p;
+    }
+
+  g_print ("Socket is %s\n", path);
+  
+  server->listen_fd = socket (PF_UNIX, SOCK_STREAM, 0);
+  
+  if (server->listen_fd < 0)
+    {
+      g_printerr ("Failed to create socket: %s",
+                  strerror (errno));
+      exit (1);
+    }
+
+  _DBUS_ZERO (addr);
+  addr.sun_family = AF_UNIX;
+  
+#ifdef HAVE_ABSTRACT_SOCKETS
+  /* remember that abstract names aren't nul-terminated so we rely
+   * on sun_path being filled in with zeroes above.
+   */
+  addr.sun_path[0] = '\0'; /* this is what says "use abstract" */
+  strncpy (&addr.sun_path[1], path, _DBUS_MAX_SUN_PATH_LENGTH - 2);
+  /* _dbus_verbose_bytes (addr.sun_path, sizeof (addr.sun_path)); */
+#else /* HAVE_ABSTRACT_SOCKETS */
+  {
+    struct stat sb;
+    
+    if (stat (path, &sb) == 0 &&
+        S_ISSOCK (sb.st_mode))
+      unlink (path);
+  }
+
+  strncpy (addr.sun_path, path, _DBUS_MAX_SUN_PATH_LENGTH - 1);
+#endif /* ! HAVE_ABSTRACT_SOCKETS */
+  
+  if (bind (server->listen_fd, (struct sockaddr*) &addr, sizeof (addr)) < 0)
+    {
+      g_printerr ("Failed to bind socket \"%s\": %s",
+                  path, strerror (errno));
+      exit (1);
+    }
+
+  if (listen (server->listen_fd, 30 /* backlog */) < 0)
+    {
+      g_printerr ("Failed to listen on socket \"%s\": %s",
+                  path, strerror (errno));
+      exit (1);
+    }
+
+  plain_sockets_address = path;
+
+  channel = g_io_channel_unix_new (server->listen_fd);
+  server->source_id =
+    g_io_add_watch (channel,
+                    G_IO_IN | G_IO_ERR | G_IO_HUP | G_IO_NVAL | G_IO_PRI,
+                    plain_sockets_new_client_watch,
+                    server);
+  g_io_channel_unref (channel);
+  
+  return server;
+}
+
+static void
+plain_sockets_stop_server (ServerData *sd,
+                           void       *server_v)
+{
+  PlainSocketServer *server = server_v;
+
+  g_source_remove (server->source_id);
+  
+  close (server->listen_fd);
+  g_free (server);
+  
+  {
+    struct stat sb;
+    
+    if (stat (plain_sockets_address, &sb) == 0 &&
+        S_ISSOCK (sb.st_mode))
+      unlink (plain_sockets_address);
+  }
+}
+
+static gboolean
+plain_sockets_client_side_watch (GIOChannel   *source,
+                                 GIOCondition  condition,
+                                 gpointer      data)
+{
+  ClientData *cd = data;
+  int fd = g_io_channel_unix_get_fd (source);
+
+  if (condition & G_IO_IN)
+    {
+      read_and_drop_on_floor (fd, echo_return_size);
+    }
+  else if (condition & G_IO_OUT)
+    {
+      cd->iterations += 1;
+      if (cd->iterations >= N_ITERATIONS)
+        {
+          g_print ("Completed %d iterations\n", N_ITERATIONS);
+          g_main_loop_quit (cd->loop);
+        }
+      
+      write_junk (fd, echo_call_size);
+    }
+  else
+    {
+      g_printerr ("Unexpected IO condition in client thread\n");
+      exit (1);
+    }
+
+  return TRUE;
+}
+
+static void*
+plain_sockets_thread_func (void *data)
+{
+  GMainContext *context;
+  ClientData cd;
+  int fd;
+  struct sockaddr_un addr;
+  GIOChannel *channel;
+  GSource *gsource;
+  
+  g_printerr ("Starting client thread %p\n",
+              g_thread_self());
+  
+  fd = socket (PF_UNIX, SOCK_STREAM, 0);
+  
+  if (fd < 0)
+    {
+      g_printerr ("Failed to create socket: %s",
+                  strerror (errno)); 
+      exit (1);
+    }
+
+  _DBUS_ZERO (addr);
+  addr.sun_family = AF_UNIX;
+
+#ifdef HAVE_ABSTRACT_SOCKETS
+  /* remember that abstract names aren't nul-terminated so we rely
+   * on sun_path being filled in with zeroes above.
+   */
+  addr.sun_path[0] = '\0'; /* this is what says "use abstract" */
+  strncpy (&addr.sun_path[1], plain_sockets_address, _DBUS_MAX_SUN_PATH_LENGTH - 2);
+  /* _dbus_verbose_bytes (addr.sun_path, sizeof (addr.sun_path)); */
+#else /* HAVE_ABSTRACT_SOCKETS */
+  strncpy (addr.sun_path, plain_sockets_address, _DBUS_MAX_SUN_PATH_LENGTH - 1);
+#endif /* ! HAVE_ABSTRACT_SOCKETS */
+  
+  if (connect (fd, (struct sockaddr*) &addr, sizeof (addr)) < 0)
+    {      
+      g_printerr ("Failed to connect to socket %s: %s",
+                  plain_sockets_address, strerror (errno));
+      exit (1);
+    }
+
+  context = g_main_context_new ();
+
+  cd.iterations = 1;
+  cd.loop = g_main_loop_new (context, FALSE);
+
+  channel = g_io_channel_unix_new (fd);
+  
+  gsource = g_io_create_watch (channel,
+                               G_IO_IN | G_IO_OUT |
+                               G_IO_ERR | G_IO_HUP | G_IO_NVAL | G_IO_PRI);
+
+  g_source_set_callback (gsource,
+                         (GSourceFunc)plain_sockets_client_side_watch,
+                         &cd, NULL);
+
+  g_source_attach (gsource, context);
+
+  g_io_channel_unref (channel);
+
+  g_printerr ("Client thread writing to prime pingpong\n");
+  write_junk (fd, echo_call_size);
+  g_printerr ("Client thread done writing primer\n");
+
+  g_printerr ("Client thread entering main loop\n");
+  g_main_loop_run (cd.loop);
+  g_printerr ("Client thread %p exiting main loop\n",
+              g_thread_self());
+
+  g_source_destroy (gsource);
+  
+  close (fd);
+  
+  g_main_loop_unref (cd.loop);
+  g_main_context_unref (context);
+
+  return NULL;
+}
+
+static void
+plain_sockets_main_loop_run (GMainLoop *loop)
+{
+  g_main_loop_run (loop);
+}
+
+static const ProfileRunVTable plain_sockets_vtable = {
+  "plain sockets",
+  plain_sockets_init_server,
+  plain_sockets_stop_server,
+  plain_sockets_thread_func,
+  plain_sockets_main_loop_run
+};
+
+static void
+do_profile_run (const ProfileRunVTable *vtable)
+{
+  GTimer *timer;
+  int i;
+  double secs;
+  ServerData sd;
+  void *server;
 
   sd.handled = 0;
   sd.n_clients = 0;
   sd.loop = g_main_loop_new (NULL, FALSE);
 
-  dbus_server_setup_with_g_main (server, NULL);
+  server = (* vtable->init_server) (&sd);
   
   for (i = 0; i < N_CLIENT_THREADS; i++)
     {
-      g_thread_create (thread_func, NULL, FALSE, NULL);
+      g_thread_create (vtable->client_thread_func, NULL, FALSE, NULL);
     }
 
   timer = g_timer_new ();
   
-  g_printerr ("Server thread entering main loop\n");
-  g_main_loop_run (sd.loop);
-  g_printerr ("Server thread exiting main loop\n");
+  g_printerr ("Server thread %p entering main loop\n",
+              g_thread_self());
+  (* vtable->main_loop_run_func) (sd.loop);
+  g_printerr ("Server thread %p exiting main loop\n",
+              g_thread_self());
 
   secs = g_timer_elapsed (timer, NULL);
   g_timer_destroy (timer);
 
-  g_printerr ("%g seconds, %d round trips, %f seconds per pingpong\n",
-              secs, sd.handled, secs/sd.handled);
-#ifndef DBUS_DISABLE_ASSERT
-  g_printerr ("You should probably --disable-asserts before you profile as they have noticeable overhead\n");
-#endif
+  g_printerr ("%s: %g seconds, %d round trips, %f seconds per pingpong\n",
+              vtable->name, secs, sd.handled, secs/sd.handled);
 
-  g_printerr ("The following g_warning is because we try to call g_source_remove_poll() after g_source_destroy() in dbus-gmain.c, I think we need to add a source free func that clears out the watch/timeout funcs\n");
-  dbus_server_unref (server);
+  (* vtable->stop_server) (&sd, server);
   
   g_main_loop_unref (sd.loop);
+}
+
+int
+main (int argc, char *argv[])
+{
+  g_thread_init (NULL);
+  dbus_g_thread_init ();
+  
+  payload = g_malloc (PAYLOAD_SIZE);
+
+  /* The actual size of the DBusMessage on the wire, as of Nov 23 2004,
+   * without the payload
+   */
+  echo_call_size = 140;
+  echo_return_size = 32;
+
+  if (argc > 1 && strcmp (argv[1], "plain_sockets") == 0)
+    do_profile_run (&plain_sockets_vtable);
+  else
+    do_profile_run (&with_bus_vtable);
   
   return 0;
 }