2003-01-18 Havoc Pennington <hp@pobox.com>
[platform/upstream/dbus.git] / dbus / dbus-transport-unix.c
index be1ab44..ef50863 100644 (file)
@@ -1,7 +1,7 @@
 /* -*- mode: C; c-file-style: "gnu" -*- */
 /* dbus-transport-unix.c UNIX socket subclasses of DBusTransport
  *
- * Copyright (C) 2002  Red Hat Inc.
+ * Copyright (C) 2002, 2003  Red Hat Inc.
  *
  * Licensed under the Academic Free License version 1.2
  * 
@@ -51,7 +51,7 @@ struct DBusTransportUnix
 {
   DBusTransport base;                   /**< Parent instance */
   int fd;                               /**< File descriptor. */
-  DBusWatch *watch;                     /**< Watch for readability. */
+  DBusWatch *read_watch;                /**< Watch for readability. */
   DBusWatch *write_watch;               /**< Watch for writability. */
 
   int max_bytes_read_per_iteration;     /**< To avoid blocking too long. */
@@ -71,14 +71,14 @@ free_watches (DBusTransport *transport)
 {
   DBusTransportUnix *unix_transport = (DBusTransportUnix*) transport;
   
-  if (unix_transport->watch)
+  if (unix_transport->read_watch)
     {
       if (transport->connection)
         _dbus_connection_remove_watch (transport->connection,
-                                       unix_transport->watch);
-      _dbus_watch_invalidate (unix_transport->watch);
-      _dbus_watch_unref (unix_transport->watch);
-      unix_transport->watch = NULL;
+                                       unix_transport->read_watch);
+      _dbus_watch_invalidate (unix_transport->read_watch);
+      _dbus_watch_unref (unix_transport->read_watch);
+      unix_transport->read_watch = NULL;
     }
 
   if (unix_transport->write_watch)
@@ -103,7 +103,7 @@ unix_finalize (DBusTransport *transport)
   
   _dbus_transport_finalize_base (transport);
 
-  _dbus_assert (unix_transport->watch == NULL);
+  _dbus_assert (unix_transport->read_watch == NULL);
   _dbus_assert (unix_transport->write_watch == NULL);
   
   dbus_free (transport);
@@ -138,7 +138,7 @@ check_write_watch (DBusTransport *transport)
 
       /* we can maybe add it some other time, just silently bomb */
       if (unix_transport->write_watch == NULL)
-        return;
+        goto out;
 
       if (!_dbus_connection_add_watch (transport->connection,
                                        unix_transport->write_watch))
@@ -158,6 +158,63 @@ check_write_watch (DBusTransport *transport)
       unix_transport->write_watch = NULL;
     }
 
+ out:
+  _dbus_transport_unref (transport);
+}
+
+static void
+check_read_watch (DBusTransport *transport)
+{
+  DBusTransportUnix *unix_transport = (DBusTransportUnix*) transport;
+  dbus_bool_t need_read_watch;
+
+  if (transport->connection == NULL)
+    return;
+  
+  _dbus_transport_ref (transport);
+
+  need_read_watch =
+    _dbus_counter_get_value (transport->live_messages_size) < transport->max_live_messages_size;
+
+  if (transport->disconnected)
+    need_read_watch = FALSE;
+  
+  if (need_read_watch &&
+      unix_transport->read_watch == NULL)
+    {
+      _dbus_verbose ("Adding read watch to unix fd %d\n",
+                     unix_transport->fd);
+      
+      unix_transport->read_watch =
+        _dbus_watch_new (unix_transport->fd,
+                         DBUS_WATCH_READABLE);
+
+      /* we can maybe add it some other time, just silently bomb */
+      if (unix_transport->read_watch == NULL)
+        goto out;
+
+      if (!_dbus_connection_add_watch (transport->connection,
+                                       unix_transport->read_watch))
+        {
+          _dbus_watch_invalidate (unix_transport->read_watch);
+          _dbus_watch_unref (unix_transport->read_watch);
+          unix_transport->read_watch = NULL;
+        }
+    }
+  else if (!need_read_watch &&
+           unix_transport->read_watch != NULL)
+    {
+      _dbus_verbose ("Removing read watch from unix fd %d\n",
+                     unix_transport->fd);
+      
+      _dbus_connection_remove_watch (transport->connection,
+                                     unix_transport->read_watch);
+      _dbus_watch_invalidate (unix_transport->read_watch);
+      _dbus_watch_unref (unix_transport->read_watch);
+      unix_transport->read_watch = NULL;
+    }
+
+ out:
   _dbus_transport_unref (transport);
 }
 
@@ -178,7 +235,8 @@ queue_messages (DBusTransport *transport)
   while ((message = _dbus_message_loader_pop_message (transport->loader)))
     {
       _dbus_verbose ("queueing received message %p\n", message);
-      
+
+      _dbus_message_add_size_counter (message, transport->live_messages_size);
       _dbus_connection_queue_received_message (transport->connection,
                                                message);
       dbus_message_unref (message);
@@ -189,6 +247,9 @@ queue_messages (DBusTransport *transport)
       _dbus_verbose ("Corrupted message stream, disconnecting\n");
       do_io_error (transport);
     }
+
+  /* check read watch in case we've now exceeded max outstanding messages */
+  check_read_watch (transport);
 }
 
 /* return value is whether we successfully read any new data. */
@@ -526,6 +587,12 @@ do_writing (DBusTransport *transport)
                          total, unix_transport->max_bytes_written_per_iteration);
           goto out;
         }
+
+      if (unix_transport->write_watch == NULL)
+        {
+          _dbus_verbose ("write watch removed, not writing more stuff\n");
+          goto out;
+        }
       
       message = _dbus_connection_get_message_to_send (transport->connection);
       _dbus_assert (message != NULL);
@@ -650,6 +717,11 @@ do_reading (DBusTransport *transport)
   total = 0;
 
  again:
+
+  /* See if we've exceeded max messages and need to disable reading */
+  check_read_watch (transport);
+  if (unix_transport->read_watch == NULL)
+    return;
   
   if (total > unix_transport->max_bytes_read_per_iteration)
     {
@@ -761,7 +833,7 @@ unix_handle_watch (DBusTransport *transport,
 {
   DBusTransportUnix *unix_transport = (DBusTransportUnix*) transport;
 
-  _dbus_assert (watch == unix_transport->watch ||
+  _dbus_assert (watch == unix_transport->read_watch ||
                 watch == unix_transport->write_watch);
   
   if (flags & (DBUS_WATCH_HANGUP | DBUS_WATCH_ERROR))
@@ -770,7 +842,7 @@ unix_handle_watch (DBusTransport *transport,
       return;
     }
   
-  if (watch == unix_transport->watch &&
+  if (watch == unix_transport->read_watch &&
       (flags & DBUS_WATCH_READABLE))
     {
       _dbus_verbose ("handling read watch\n");
@@ -800,30 +872,7 @@ unix_disconnect (DBusTransport *transport)
 static void
 unix_connection_set (DBusTransport *transport)
 {
-  DBusTransportUnix *unix_transport = (DBusTransportUnix*) transport;
-  DBusWatch *watch;
-
-  _dbus_assert (unix_transport->watch == NULL);
-  
-  watch = _dbus_watch_new (unix_transport->fd,
-                           DBUS_WATCH_READABLE);
-  
-  if (watch == NULL)
-    {
-      _dbus_transport_disconnect (transport);
-      return;
-    }
-  
-  if (!_dbus_connection_add_watch (transport->connection,
-                                   watch))
-    {
-      _dbus_transport_disconnect (transport);
-      _dbus_watch_unref (watch);
-      return;
-    }
-
-  unix_transport->watch = watch;
-
+  check_read_watch (transport);
   check_write_watch (transport);
 }
 
@@ -844,6 +893,11 @@ unix_do_iteration (DBusTransport *transport,
   fd_set read_set;
   fd_set write_set;
   dbus_bool_t do_select;
+
+  _dbus_verbose (" iteration flags = %s%s timeout = %d\n",
+                 flags & DBUS_ITERATION_DO_READING ? "read" : "",
+                 flags & DBUS_ITERATION_DO_WRITING ? "write" : "",
+                 timeout_milliseconds);
   
   /* "again" has to be up here because on EINTR the fd sets become
    * undefined
@@ -856,6 +910,9 @@ unix_do_iteration (DBusTransport *transport,
    * read/write messages, but regardless of those we may need to block
    * for reading/writing to do auth.  But if we do reading for auth,
    * we don't want to read any messages yet if not given DO_READING.
+   *
+   * Also, if read_watch == NULL or write_watch == NULL, we don't
+   * want to read/write so don't.
    */
 
   FD_ZERO (&read_set);
@@ -863,14 +920,15 @@ unix_do_iteration (DBusTransport *transport,
   
   if (_dbus_transport_get_is_authenticated (transport))
     {
-      if (flags & DBUS_ITERATION_DO_READING)
+      if (unix_transport->read_watch &&
+          (flags & DBUS_ITERATION_DO_READING))
         {
           FD_SET (unix_transport->fd, &read_set);
           do_select = TRUE;
         }
       
-      
-      if (flags & DBUS_ITERATION_DO_WRITING)
+      if (unix_transport->write_watch &&
+          (flags & DBUS_ITERATION_DO_WRITING))
         {
           FD_SET (unix_transport->fd, &write_set);
           do_select = TRUE;
@@ -957,13 +1015,21 @@ unix_do_iteration (DBusTransport *transport,
     }
 }
 
+static void
+unix_live_messages_changed (DBusTransport *transport)
+{
+  /* See if we should look for incoming messages again */
+  check_read_watch (transport);
+}
+
 static DBusTransportVTable unix_vtable = {
   unix_finalize,
   unix_handle_watch,
   unix_disconnect,
   unix_connection_set,
   unix_messages_pending,
-  unix_do_iteration
+  unix_do_iteration,
+  unix_live_messages_changed
 };
 
 /**
@@ -1009,6 +1075,7 @@ _dbus_transport_new_for_fd (int         fd,
   unix_transport->max_bytes_read_per_iteration = 2048;
   unix_transport->max_bytes_written_per_iteration = 2048;
 
+  check_read_watch ((DBusTransport*) unix_transport);
   check_write_watch ((DBusTransport*) unix_transport);
   
   return (DBusTransport*) unix_transport;