2003-03-16 Havoc Pennington <hp@pobox.com>
authorHavoc Pennington <hp@redhat.com>
Sun, 16 Mar 2003 20:16:47 +0000 (20:16 +0000)
committerHavoc Pennington <hp@redhat.com>
Sun, 16 Mar 2003 20:16:47 +0000 (20:16 +0000)
* dbus/dbus-string.c (_dbus_string_validate_utf8): oops, unbreak
this. always run the test suite before commit...

* bus/*: adapt to DBusConnection API changes

* glib/dbus-gmain.c: adapt to DBusConnection API changes,
requires renaming stuff to avoid dbus_connection_dispatch name
conflict.

* dbus/dbus-transport.c (_dbus_transport_queue_messages): new
function

* dbus/dbus-message.c (_dbus_message_loader_queue_messages):
separate from _dbus_message_loader_return_buffer()

* dbus/dbus-connection.c (dbus_connection_get_n_messages): remove
this, because it's now always broken to use; the number of
messages in queue vs. the number still buffered by the message
loader is undefined/meaningless. Should use
dbus_connection_get_dispatch_state().
(dbus_connection_dispatch): rename from
dbus_connection_dispatch_message

19 files changed:
ChangeLog
bus/connection.c
bus/dispatch.c
bus/utils.c
bus/utils.h
dbus/dbus-bus.c
dbus/dbus-connection.c
dbus/dbus-connection.h
dbus/dbus-memory.c
dbus/dbus-mempool.c
dbus/dbus-message-handler.c
dbus/dbus-message-internal.h
dbus/dbus-message.c
dbus/dbus-server.c
dbus/dbus-string.c
dbus/dbus-transport-unix.c
dbus/dbus-transport.c
dbus/dbus-transport.h
glib/dbus-gmain.c

index 7143d13..9189b71 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,30 @@
 2003-03-16  Havoc Pennington  <hp@pobox.com>
 
+       * dbus/dbus-string.c (_dbus_string_validate_utf8): oops, unbreak
+       this. always run the test suite before commit...
+
+       * bus/*: adapt to DBusConnection API changes
+
+       * glib/dbus-gmain.c: adapt to DBusConnection API changes, 
+       requires renaming stuff to avoid dbus_connection_dispatch name 
+       conflict.
+
+       * dbus/dbus-transport.c (_dbus_transport_queue_messages): new
+       function
+
+       * dbus/dbus-message.c (_dbus_message_loader_queue_messages):
+       separate from _dbus_message_loader_return_buffer()
+
+       * dbus/dbus-connection.c (dbus_connection_get_n_messages): remove
+       this, because it's now always broken to use; the number of
+       messages in queue vs. the number still buffered by the message
+       loader is undefined/meaningless. Should use
+       dbus_connection_get_dispatch_state().
+       (dbus_connection_dispatch): rename from
+       dbus_connection_dispatch_message
+
+2003-03-16  Havoc Pennington  <hp@pobox.com>
+
        * dbus/dbus-string.c (_dbus_string_validate_utf8): copy in a real
        implementation
 
index 3308df0..2169d8a 100644 (file)
@@ -141,8 +141,8 @@ connection_watch_callback (DBusWatch     *watch,
   
   dbus_connection_handle_watch (connection, watch, condition);
 
-  while (dbus_connection_dispatch_message (connection))
-    ;
+  bus_connection_dispatch_all_messages (connection);
+  
   dbus_connection_unref (connection);
 }
 
@@ -171,8 +171,8 @@ connection_timeout_callback (DBusTimeout   *timeout,
   
   dbus_timeout_handle (timeout);
 
-  while (dbus_connection_dispatch_message (connection))
-    ;
+  bus_connection_dispatch_all_messages (connection);
+  
   dbus_connection_unref (connection);
 }
 
index 0fe4be1..3c96d70 100644 (file)
@@ -514,7 +514,7 @@ kill_client_connection (BusContext     *context,
   _dbus_assert (bus_test_client_listed (connection));
   
   /* Run disconnect handler in test.c */
-  if (dbus_connection_dispatch_message (connection))
+  if (bus_connection_dispatch_one_message (connection))
     _dbus_assert_not_reached ("something received on connection being killed other than the disconnect");
   
   _dbus_assert (!dbus_connection_get_is_connected (connection));
@@ -859,7 +859,7 @@ check_hello_connection (BusContext *context)
       dbus_connection_ref (connection);
       dbus_connection_disconnect (connection);
       /* dispatching disconnect handler will unref once */
-      if (dbus_connection_dispatch_message (connection))
+      if (bus_connection_dispatch_one_message (connection))
         _dbus_assert_not_reached ("message other than disconnect dispatched after failure to register");
       dbus_connection_unref (connection);
       _dbus_assert (!bus_test_client_listed (connection));
@@ -967,19 +967,19 @@ bus_dispatch_test (const DBusString *test_data_dir)
                          check_hello_connection);
 
   dbus_connection_disconnect (foo);
-  if (dbus_connection_dispatch_message (foo))
+  if (bus_connection_dispatch_one_message (foo))
     _dbus_assert_not_reached ("extra message in queue");
   dbus_connection_unref (foo);
   _dbus_assert (!bus_test_client_listed (foo));
 
   dbus_connection_disconnect (bar);
-  if (dbus_connection_dispatch_message (bar))
+  if (bus_connection_dispatch_one_message (bar))
     _dbus_assert_not_reached ("extra message in queue");
   dbus_connection_unref (bar);
   _dbus_assert (!bus_test_client_listed (bar));
 
   dbus_connection_disconnect (baz);
-  if (dbus_connection_dispatch_message (baz))
+  if (bus_connection_dispatch_one_message (baz))
     _dbus_assert_not_reached ("extra message in queue");
   dbus_connection_unref (baz);
   _dbus_assert (!bus_test_client_listed (baz));
index fadfc14..8a68d8a 100644 (file)
@@ -39,3 +39,20 @@ bus_wait_for_memory (void)
 #endif
 }
 
+void
+bus_connection_dispatch_all_messages (DBusConnection *connection)
+{
+  while (bus_connection_dispatch_one_message (connection))
+    ;
+}
+
+dbus_bool_t
+bus_connection_dispatch_one_message  (DBusConnection *connection)
+{
+  DBusDispatchStatus status;
+
+  while ((status = dbus_connection_dispatch (connection)) == DBUS_DISPATCH_NEED_MEMORY)
+    bus_wait_for_memory ();
+  
+  return status == DBUS_DISPATCH_DATA_REMAINS;
+}
index 41eb555..968ece3 100644 (file)
 #ifndef BUS_UTILS_H
 #define BUS_UTILS_H
 
+#include <dbus/dbus.h>
+
 void bus_wait_for_memory (void);
 
 extern const char bus_no_memory_message[];
 #define BUS_SET_OOM(error) dbus_set_error ((error), DBUS_ERROR_NO_MEMORY, bus_no_memory_message)
 
+void        bus_connection_dispatch_all_messages (DBusConnection *connection);
+dbus_bool_t bus_connection_dispatch_one_message  (DBusConnection *connection);
+
 #endif /* BUS_ACTIVATION_H */
index 3e40925..8443488 100644 (file)
@@ -49,7 +49,7 @@
  */
 typedef struct
 {
-  char *base_service;
+  char *base_service; /**< Base service name of this connection */
 
 } BusData;
 
@@ -244,7 +244,7 @@ dbus_bus_register (DBusConnection *connection,
  * once per connection.
  *
  * @param connection the connection
- * @param the base service name
+ * @param base_service the base service name
  * @returns #FALSE if not enough memory
  */
 dbus_bool_t
index 79f27a8..2b4a760 100644 (file)
@@ -76,8 +76,8 @@ struct DBusConnection
 
   DBusMutex *mutex; /**< Lock on the entire DBusConnection */
 
-  dbus_bool_t dispatch_acquired; /**< Protects dispatch_message */
-  DBusCondVar *dispatch_cond;    /**< Protects dispatch_message */
+  dbus_bool_t dispatch_acquired; /**< Protects dispatch() */
+  DBusCondVar *dispatch_cond;    /**< Protects dispatch() */
 
   dbus_bool_t io_path_acquired;  /**< Protects transport io path */
   DBusCondVar *io_path_cond;     /**< Protects transport io path */
@@ -126,8 +126,9 @@ typedef struct
 
 static void reply_handler_data_free (ReplyHandlerData *data);
 
-static void _dbus_connection_remove_timeout_locked (DBusConnection *connection,
-                                                   DBusTimeout    *timeout);
+static void               _dbus_connection_remove_timeout_locked        (DBusConnection *connection,
+                                                                         DBusTimeout    *timeout);
+static DBusDispatchStatus _dbus_connection_get_dispatch_status_unlocked (DBusConnection *connection);
 
 /**
  * Acquires the connection lock.
@@ -359,7 +360,7 @@ _dbus_connection_remove_watch (DBusConnection *connection,
  * function on a watch that was not previously added.
  *
  * @param connection the connection.
- * @param timeout the timeout to toggle.
+ * @param watch the watch to toggle.
  * @param enabled whether to enable or disable
  */
 void
@@ -1355,6 +1356,32 @@ dbus_connection_send_with_reply (DBusConnection     *connection,
   return TRUE;
 }
 
+
+static DBusMessage*
+check_for_reply_unlocked (DBusConnection *connection,
+                          dbus_int32_t    client_serial)
+{
+  DBusList *link;
+  
+  link = _dbus_list_get_first_link (&connection->incoming_messages);
+
+  while (link != NULL)
+    {
+      DBusMessage *reply = link->data;
+
+      if (dbus_message_get_reply_serial (reply) == client_serial)
+       {
+         _dbus_list_remove_link (&connection->incoming_messages, link);
+         connection->n_incoming  -= 1;
+         dbus_message_ref (reply);
+         return reply;
+       }
+      link = _dbus_list_get_next_link (&connection->incoming_messages, link);
+    }
+
+  return NULL;
+}
+
 /**
  * Sends a message and blocks a certain time period while waiting for a reply.
  * This function does not dispatch any message handlers until the main loop
@@ -1383,11 +1410,11 @@ dbus_connection_send_with_reply_and_block (DBusConnection     *connection,
                                            DBusError          *error)
 {
   dbus_int32_t client_serial;
-  DBusList *link;
   long start_tv_sec, start_tv_usec;
   long end_tv_sec, end_tv_usec;
   long tv_sec, tv_usec;
-
+  DBusDispatchStatus status;
+  
   if (timeout_milliseconds == -1)
     timeout_milliseconds = DEFAULT_TIMEOUT_VALUE;
 
@@ -1423,35 +1450,32 @@ dbus_connection_send_with_reply_and_block (DBusConnection     *connection,
                  end_tv_sec, end_tv_usec);
   
   /* Now we wait... */
-  /* THREAD TODO: This is busted. What if a dispatch_message or pop_message
+  /* THREAD TODO: This is busted. What if a dispatch() or pop_message
    * gets the message before we do?
    */
- block_again:  
-  
+  /* always block at least once as we know we don't have the reply yet */
   _dbus_connection_do_iteration (connection,
-                                DBUS_ITERATION_DO_READING |
-                                DBUS_ITERATION_BLOCK,
-                                timeout_milliseconds);
+                                 DBUS_ITERATION_DO_READING |
+                                 DBUS_ITERATION_BLOCK,
+                                 timeout_milliseconds);
 
-  /* Check if we've gotten a reply */
-  link = _dbus_list_get_first_link (&connection->incoming_messages);
+ recheck_status:
 
-  while (link != NULL)
-    {
-      DBusMessage *reply = link->data;
+  /* queue messages and get status */
+  status = _dbus_connection_get_dispatch_status_unlocked (connection);
 
-      if (dbus_message_get_reply_serial (reply) == client_serial)
-       {
-         _dbus_list_remove_link (&connection->incoming_messages, link);
-         connection->n_incoming  -= 1;
-         dbus_message_ref (reply);
-         
-         dbus_mutex_unlock (connection->mutex);
-         return reply;
-       }
-      link = _dbus_list_get_next_link (&connection->incoming_messages, link);
+  if (status == DBUS_DISPATCH_DATA_REMAINS)
+    {
+      DBusMessage *reply;
+      
+      reply = check_for_reply_unlocked (connection, client_serial);
+      if (reply != NULL)
+        {
+          dbus_mutex_unlock (connection->mutex);
+          return reply;
+        }
     }
-
+  
   _dbus_get_current_time (&tv_sec, &tv_usec);
   
   if (tv_sec < start_tv_sec)
@@ -1466,7 +1490,29 @@ dbus_connection_send_with_reply_and_block (DBusConnection     *connection,
       _dbus_verbose ("%d milliseconds remain\n", timeout_milliseconds);
       _dbus_assert (timeout_milliseconds > 0);
       
-      goto block_again; /* not expired yet */
+      if (status == DBUS_DISPATCH_NEED_MEMORY)
+        {
+          /* Try sleeping a bit, as we aren't sure we need to block for reading,
+           * we may already have a reply in the buffer and just can't process
+           * it.
+           */
+          if (timeout_milliseconds < 100)
+            ; /* just busy loop */
+          else if (timeout_milliseconds <= 1000)
+            _dbus_sleep_milliseconds (timeout_milliseconds / 3);
+          else
+            _dbus_sleep_milliseconds (1000);
+        }
+      else
+        {          
+          /* block again, we don't have the reply buffered yet. */
+          _dbus_connection_do_iteration (connection,
+                                         DBUS_ITERATION_DO_READING |
+                                         DBUS_ITERATION_BLOCK,
+                                         timeout_milliseconds);
+        }
+
+      goto recheck_status;
     }
   
   if (dbus_connection_get_is_connected (connection))
@@ -1503,24 +1549,6 @@ dbus_connection_flush (DBusConnection *connection)
   dbus_mutex_unlock (connection->mutex);
 }
 
-/**
- * Gets the number of messages in the incoming message queue.
- *
- * @param connection the connection.
- * @returns the number of messages in the queue.
- */
-int
-dbus_connection_get_n_messages (DBusConnection *connection)
-{
-  int res;
-
-  dbus_mutex_lock (connection->mutex);
-  res = connection->n_incoming;
-  dbus_mutex_unlock (connection->mutex);
-  return res;
-}
-
-
 /* Call with mutex held. Will drop it while waiting and re-acquire
  * before returning
  */
@@ -1551,7 +1579,15 @@ DBusMessage*
 dbus_connection_borrow_message  (DBusConnection *connection)
 {
   DBusMessage *message;
-
+  DBusDispatchStatus status;
+  
+  /* this is called for the side effect that it queues
+   * up any messages from the transport
+   */
+  status = dbus_connection_get_dispatch_status (connection);
+  if (status != DBUS_DISPATCH_DATA_REMAINS)
+    return NULL;
+  
   dbus_mutex_lock (connection->mutex);
 
   if (connection->message_borrowed != NULL)
@@ -1672,8 +1708,17 @@ DBusMessage*
 dbus_connection_pop_message (DBusConnection *connection)
 {
   DBusMessage *message;
-  dbus_mutex_lock (connection->mutex);
+  DBusDispatchStatus status;
 
+  /* this is called for the side effect that it queues
+   * up any messages from the transport
+   */
+  status = dbus_connection_get_dispatch_status (connection);
+  if (status != DBUS_DISPATCH_DATA_REMAINS)
+    return NULL;
+  
+  dbus_mutex_lock (connection->mutex);
+  
   message = _dbus_connection_pop_message_unlocked (connection);
   
   dbus_mutex_unlock (connection->mutex);
@@ -1724,15 +1769,62 @@ _dbus_connection_failed_pop (DBusConnection *connection,
   connection->n_incoming += 1;
 }
 
+static DBusDispatchStatus
+_dbus_connection_get_dispatch_status_unlocked (DBusConnection *connection)
+{
+  if (connection->n_incoming > 0)
+    return DBUS_DISPATCH_DATA_REMAINS;
+  else if (!_dbus_transport_queue_messages (connection->transport))
+    return DBUS_DISPATCH_NEED_MEMORY;
+  else
+    {
+      DBusDispatchStatus status;
+      
+      status = _dbus_transport_get_dispatch_status (connection->transport);
+
+      if (status != DBUS_DISPATCH_COMPLETE)
+        return status;
+      else if (connection->n_incoming > 0)
+        return DBUS_DISPATCH_DATA_REMAINS;
+      else
+        return DBUS_DISPATCH_COMPLETE;
+    }
+}
+
+/**
+ * Gets the current state (what we would currently return
+ * from dbus_connection_dispatch()) but doesn't actually
+ * dispatch any messages.
+ * 
+ * @param connection the connection.
+ * @returns current dispatch status
+ */
+DBusDispatchStatus
+dbus_connection_get_dispatch_status (DBusConnection *connection)
+{
+  DBusDispatchStatus status;
+  
+  dbus_mutex_lock (connection->mutex);
+
+  status = _dbus_connection_get_dispatch_status_unlocked (connection);
+  
+  dbus_mutex_unlock (connection->mutex);
+
+  return status;
+}
+
 /**
- * Pops the first-received message from the current incoming message
- * queue, runs any handlers for it, then unrefs the message.
+ * Processes data buffered while handling watches, queueing zero or
+ * more incoming messages. Then pops the first-received message from
+ * the current incoming message queue, runs any handlers for it, and
+ * unrefs the message. Returns a status indicating whether messages/data
+ * remain, more memory is needed, or all data has been processed.
  *
  * @param connection the connection
- * @returns #TRUE if the queue is not empty after dispatch
+ * @returns dispatch status
  */
-dbus_bool_t
-dbus_connection_dispatch_message (DBusConnection *connection)
+DBusDispatchStatus
+dbus_connection_dispatch (DBusConnection *connection)
 {
   DBusMessageHandler *handler;
   DBusMessage *message;
@@ -1741,9 +1833,14 @@ dbus_connection_dispatch_message (DBusConnection *connection)
   ReplyHandlerData *reply_handler_data;
   const char *name;
   dbus_int32_t reply_serial;
+  DBusDispatchStatus status;
   
-  dbus_mutex_lock (connection->mutex);
+  status = dbus_connection_get_dispatch_status (connection);
+  if (status != DBUS_DISPATCH_DATA_REMAINS)
+    return status;
 
+  dbus_mutex_lock (connection->mutex);
+  
   /* We need to ref the connection since the callback could potentially
    * drop the last ref to it
    */
@@ -1753,17 +1850,23 @@ dbus_connection_dispatch_message (DBusConnection *connection)
   
   /* This call may drop the lock during the execution (if waiting for
    * borrowed messages to be returned) but the order of message
-   * dispatch if several threads call dispatch_message is still
+   * dispatch if several threads call dispatch() is still
    * protected by the lock, since only one will get the lock, and that
    * one will finish the message dispatching
    */
   message_link = _dbus_connection_pop_message_link_unlocked (connection);
   if (message_link == NULL)
     {
+      /* another thread dispatched our stuff */
+
       _dbus_connection_release_dispatch (connection);
       dbus_mutex_unlock (connection->mutex);
+
+      status = dbus_connection_get_dispatch_status (connection);
+
       dbus_connection_unref (connection);
-      return FALSE;
+
+      return status;
     }
 
   message = message_link->data;
@@ -1780,14 +1883,14 @@ dbus_connection_dispatch_message (DBusConnection *connection)
       dbus_mutex_unlock (connection->mutex);
       _dbus_connection_failed_pop (connection, message_link);
       dbus_connection_unref (connection);
-      return FALSE;
+      return DBUS_DISPATCH_NEED_MEMORY;
     }
   
   _dbus_list_foreach (&filter_list_copy,
                      (DBusForeachFunction)dbus_message_handler_ref,
                      NULL);
 
-  /* We're still protected from dispatch_message reentrancy here
+  /* We're still protected from dispatch() reentrancy here
    * since we acquired the dispatcher
    */
   dbus_mutex_unlock (connection->mutex);
@@ -1855,8 +1958,9 @@ dbus_connection_dispatch_message (DBusConnection *connection)
                                                 name);
       if (handler != NULL)
         {
-         /* We're still protected from dispatch_message reentrancy here
-          * since we acquired the dispatcher */
+         /* We're still protected from dispatch() reentrancy here
+          * since we acquired the dispatcher
+           */
          dbus_mutex_unlock (connection->mutex);
 
           _dbus_verbose ("  running app handler on message %p\n", message);
@@ -1876,10 +1980,15 @@ dbus_connection_dispatch_message (DBusConnection *connection)
   _dbus_connection_release_dispatch (connection);
   dbus_mutex_unlock (connection->mutex);
   _dbus_list_free_link (message_link);
+  dbus_message_unref (message); /* don't want the message to count in max message limits
+                                 * in computing dispatch status
+                                 */
+  
+  status = dbus_connection_get_dispatch_status (connection);
+  
   dbus_connection_unref (connection);
-  dbus_message_unref (message);
   
-  return connection->n_incoming > 0;
+  return status;
 }
 
 /**
@@ -2458,6 +2567,9 @@ dbus_connection_get_max_message_size (DBusConnection *connection)
  * and that contains a half-dozen small messages, we may exceed the
  * size max by that amount. But this should be inconsequential.
  *
+ * This does imply that we can't call read() with a buffer larger
+ * than we're willing to exceed this limit by.
+ *
  * @param connection the connection
  * @param size the maximum size in bytes of all outstanding messages
  */
index 78a8d58..b4e6007 100644 (file)
@@ -56,6 +56,13 @@ typedef enum
                                  *   can be present in current state). */
 } DBusWatchFlags;
 
+typedef enum
+{
+  DBUS_DISPATCH_DATA_REMAINS,  /**< There is more data to potentially convert to messages. */
+  DBUS_DISPATCH_COMPLETE,      /**< All currently available data has been processed. */
+  DBUS_DISPATCH_NEED_MEMORY    /**< More memory is needed to continue. */
+} DBusDispatchStatus;
+
 typedef dbus_bool_t (* DBusAddWatchFunction)       (DBusWatch      *watch,
                                                     void           *data);
 typedef void        (* DBusWatchToggledFunction)   (DBusWatch      *watch,
@@ -70,58 +77,52 @@ typedef void        (* DBusTimeoutToggledFunction) (DBusTimeout    *timeout,
 typedef void        (* DBusRemoveTimeoutFunction)  (DBusTimeout    *timeout,
                                                     void           *data);
 
-DBusConnection* dbus_connection_open                   (const char     *address,
-                                                       DBusResultCode *result);
-void            dbus_connection_ref                    (DBusConnection *connection);
-void            dbus_connection_unref                  (DBusConnection *connection);
-void            dbus_connection_disconnect             (DBusConnection *connection);
-dbus_bool_t     dbus_connection_get_is_connected       (DBusConnection *connection);
-dbus_bool_t     dbus_connection_get_is_authenticated   (DBusConnection *connection);
-void            dbus_connection_flush                  (DBusConnection *connection);
-int             dbus_connection_get_n_messages         (DBusConnection *connection);
-DBusMessage*    dbus_connection_borrow_message         (DBusConnection *connection);
-void            dbus_connection_return_message         (DBusConnection *connection,
-                                                       DBusMessage    *message);
-void            dbus_connection_steal_borrowed_message (DBusConnection *connection,
-                                                       DBusMessage    *message);
-DBusMessage*    dbus_connection_pop_message            (DBusConnection *connection);
-dbus_bool_t     dbus_connection_dispatch_message       (DBusConnection *connection);
-
-
-dbus_bool_t  dbus_connection_send                      (DBusConnection     *connection,
-                                                        DBusMessage        *message,
-                                                        dbus_int32_t       *client_serial);
-dbus_bool_t  dbus_connection_send_with_reply           (DBusConnection     *connection,
-                                                        DBusMessage        *message,
-                                                        DBusMessageHandler *reply_handler,
-                                                        int                 timeout_milliseconds);
-DBusMessage *dbus_connection_send_with_reply_and_block (DBusConnection     *connection,
-                                                        DBusMessage        *message,
-                                                        int                 timeout_milliseconds,
-                                                        DBusError          *error);
-
-
-dbus_bool_t dbus_connection_set_watch_functions      (DBusConnection            *connection,
-                                                      DBusAddWatchFunction       add_function,
-                                                      DBusRemoveWatchFunction    remove_function,
-                                                      DBusWatchToggledFunction   toggled_function,
-                                                      void                      *data,
-                                                      DBusFreeFunction           free_data_function);
-dbus_bool_t dbus_connection_set_timeout_functions    (DBusConnection            *connection,
-                                                      DBusAddTimeoutFunction     add_function,
-                                                      DBusRemoveTimeoutFunction  remove_function,
-                                                      DBusTimeoutToggledFunction toggled_function,
-                                                      void                      *data,
-                                                      DBusFreeFunction           free_data_function);
-void        dbus_connection_set_wakeup_main_function (DBusConnection            *connection,
-                                                      DBusWakeupMainFunction     wakeup_main_function,
-                                                      void                      *data,
-                                                      DBusFreeFunction           free_data_function);
-void        dbus_connection_handle_watch             (DBusConnection            *connection,
-                                                      DBusWatch                 *watch,
-                                                      unsigned int               condition);
-
-
+DBusConnection*    dbus_connection_open                      (const char                 *address,
+                                                              DBusResultCode             *result);
+void               dbus_connection_ref                       (DBusConnection             *connection);
+void               dbus_connection_unref                     (DBusConnection             *connection);
+void               dbus_connection_disconnect                (DBusConnection             *connection);
+dbus_bool_t        dbus_connection_get_is_connected          (DBusConnection             *connection);
+dbus_bool_t        dbus_connection_get_is_authenticated      (DBusConnection             *connection);
+void               dbus_connection_flush                     (DBusConnection             *connection);
+DBusMessage*       dbus_connection_borrow_message            (DBusConnection             *connection);
+void               dbus_connection_return_message            (DBusConnection             *connection,
+                                                              DBusMessage                *message);
+void               dbus_connection_steal_borrowed_message    (DBusConnection             *connection,
+                                                              DBusMessage                *message);
+DBusMessage*       dbus_connection_pop_message               (DBusConnection             *connection);
+DBusDispatchStatus dbus_connection_get_dispatch_status       (DBusConnection             *connection);
+DBusDispatchStatus dbus_connection_dispatch                  (DBusConnection             *connection);
+dbus_bool_t        dbus_connection_send                      (DBusConnection             *connection,
+                                                              DBusMessage                *message,
+                                                              dbus_int32_t               *client_serial);
+dbus_bool_t        dbus_connection_send_with_reply           (DBusConnection             *connection,
+                                                              DBusMessage                *message,
+                                                              DBusMessageHandler         *reply_handler,
+                                                              int                         timeout_milliseconds);
+DBusMessage *      dbus_connection_send_with_reply_and_block (DBusConnection             *connection,
+                                                              DBusMessage                *message,
+                                                              int                         timeout_milliseconds,
+                                                              DBusError                  *error);
+dbus_bool_t        dbus_connection_set_watch_functions       (DBusConnection             *connection,
+                                                              DBusAddWatchFunction        add_function,
+                                                              DBusRemoveWatchFunction     remove_function,
+                                                              DBusWatchToggledFunction    toggled_function,
+                                                              void                       *data,
+                                                              DBusFreeFunction            free_data_function);
+dbus_bool_t        dbus_connection_set_timeout_functions     (DBusConnection             *connection,
+                                                              DBusAddTimeoutFunction      add_function,
+                                                              DBusRemoveTimeoutFunction   remove_function,
+                                                              DBusTimeoutToggledFunction  toggled_function,
+                                                              void                       *data,
+                                                              DBusFreeFunction            free_data_function);
+void               dbus_connection_set_wakeup_main_function  (DBusConnection             *connection,
+                                                              DBusWakeupMainFunction      wakeup_main_function,
+                                                              void                       *data,
+                                                              DBusFreeFunction            free_data_function);
+void               dbus_connection_handle_watch              (DBusConnection             *connection,
+                                                              DBusWatch                  *watch,
+                                                              unsigned int                condition);
 
 
 int          dbus_watch_get_fd      (DBusWatch        *watch);
index a426b37..f7c43f5 100644 (file)
@@ -156,7 +156,9 @@ _dbus_set_fail_alloc_counter (int until_next_fail)
 
   fail_alloc_counter = until_next_fail;
 
+#if 0
   _dbus_verbose ("Set fail alloc counter = %d\n", fail_alloc_counter);
+#endif
 }
 
 /**
index 437dbfd..13ba550 100644 (file)
@@ -100,7 +100,7 @@ struct DBusMemPool
 
   DBusFreedElement *free_elements; /**< a free list of elements to recycle */
   DBusMemBlock *blocks;            /**< blocks of memory from malloc() */
-  int allocated_elements;          /* Count of outstanding allocated elements */
+  int allocated_elements;          /**< Count of outstanding allocated elements */
 };
 
 /** @} */
index 143d7b0..6d5bb78 100644 (file)
@@ -137,7 +137,7 @@ _dbus_message_handler_handle_message (DBusMessageHandler        *handler,
   dbus_mutex_unlock (message_handler_lock);
   
   /* This function doesn't ref handler/connection/message
-   * since that's done in dbus_connection_dispatch_message().
+   * since that's done in dbus_connection_dispatch().
    */
   if (function != NULL)
     return (* function) (handler, connection, message, user_data);
index 9c4ee64..ef1453a 100644 (file)
@@ -51,7 +51,8 @@ void               _dbus_message_loader_get_buffer            (DBusMessageLoader
 void               _dbus_message_loader_return_buffer         (DBusMessageLoader  *loader,
                                                                DBusString         *buffer,
                                                                int                 bytes_read);
-
+dbus_bool_t        _dbus_message_loader_queue_messages        (DBusMessageLoader  *loader);
+DBusMessage*       _dbus_message_loader_peek_message          (DBusMessageLoader  *loader);
 DBusMessage*       _dbus_message_loader_pop_message           (DBusMessageLoader  *loader);
 DBusList*          _dbus_message_loader_pop_message_link      (DBusMessageLoader  *loader);
 
index a3b713e..6bcc206 100644 (file)
@@ -2676,12 +2676,6 @@ decode_header_data (const DBusString   *data,
  * in. This function must always be called, even if no bytes were
  * successfully read.
  *
- * @todo if we run out of memory in here, we offer no way for calling
- * code to handle it, i.e. they can't re-run the message parsing
- * attempt. Perhaps much of this code could be moved to pop_message()?
- * But then that may need to distinguish NULL return for no messages
- * from NULL return for errors.
- *
  * @param loader the loader.
  * @param buffer the buffer.
  * @param bytes_read number of bytes that were read into the buffer.
@@ -2695,9 +2689,19 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
   _dbus_assert (buffer == &loader->data);
 
   loader->buffer_outstanding = FALSE;
+}
 
+/**
+ * Converts buffered data into messages.
+ *
+ * @param loader the loader.
+ * @returns #TRUE if we had enough memory to finish.
+ */
+dbus_bool_t
+_dbus_message_loader_queue_messages (DBusMessageLoader *loader)
+{
   if (loader->corrupted)
-    return;
+    return TRUE;
 
   while (_dbus_string_get_length (&loader->data) >= 16)
     {
@@ -2715,7 +2719,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
           _dbus_verbose ("Message has protocol version %d ours is %d\n",
                          (int) header_data[2], DBUS_MAJOR_PROTOCOL_VERSION);
           loader->corrupted = TRUE;
-          return;
+          return TRUE;
         }
       
       byte_order = header_data[0];
@@ -2726,7 +2730,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
           _dbus_verbose ("Message with bad byte order '%c' received\n",
                          byte_order);
          loader->corrupted = TRUE;
-         return;
+         return TRUE;
        }
 
       header_len_unsigned = _dbus_unpack_uint32 (byte_order, header_data + 4);
@@ -2737,7 +2741,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
           _dbus_verbose ("Message had broken too-small header length %u\n",
                          header_len_unsigned);
           loader->corrupted = TRUE;
-          return;
+          return TRUE;
         }
 
       if (header_len_unsigned > (unsigned) MAX_SANE_MESSAGE_SIZE ||
@@ -2747,7 +2751,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
                          header_len_unsigned,
                          body_len_unsigned);
           loader->corrupted = TRUE;
-          return;
+          return TRUE;
         }
 
       /* Now that we know the values are in signed range, get
@@ -2762,7 +2766,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
           _dbus_verbose ("header length %d is not aligned to 8 bytes\n",
                          header_len);
           loader->corrupted = TRUE;
-          return;
+          return TRUE;
         }
       
       if (header_len + body_len > loader->max_message_size)
@@ -2770,7 +2774,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
           _dbus_verbose ("Message claimed length header = %d body = %d exceeds max message length %d\n",
                          header_len, body_len, loader->max_message_size);
          loader->corrupted = TRUE;
-         return;
+         return TRUE;
        }
 
       if (_dbus_string_get_length (&loader->data) >= (header_len + body_len))
@@ -2787,7 +2791,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
            {
               _dbus_verbose ("Header was invalid\n");
              loader->corrupted = TRUE;
-             return;
+             return TRUE;
            }
           
           next_arg = header_len;
@@ -2801,7 +2805,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
                                                &next_arg))
                 {
                   loader->corrupted = TRUE;
-                  return;
+                  return TRUE;
                 }
 
               _dbus_assert (next_arg > prev);
@@ -2813,12 +2817,12 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
                              next_arg, header_len, body_len,
                              header_len + body_len);
               loader->corrupted = TRUE;
-              return;
+              return TRUE;
             }
 
          message = dbus_message_new_empty_header ();
          if (message == NULL)
-            break; /* ugh, postpone this I guess. */
+            return FALSE;
 
           message->byte_order = byte_order;
           message->header_padding = header_padding;
@@ -2834,7 +2838,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
          if (!_dbus_list_append (&loader->messages, message))
             {
               dbus_message_unref (message);
-              break;
+              return FALSE;
             }
 
           _dbus_assert (_dbus_string_get_length (&message->header) == 0);
@@ -2847,7 +2851,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
             {
               _dbus_list_remove_last (&loader->messages, message);
               dbus_message_unref (message);
-              break;
+              return FALSE;
             }
           
          if (!_dbus_string_move_len (&loader->data, 0, body_len, &message->body, 0))
@@ -2861,7 +2865,7 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
 
               _dbus_list_remove_last (&loader->messages, message);
               dbus_message_unref (message);
-              break;
+              return FALSE;
             }
 
           _dbus_assert (_dbus_string_get_length (&message->header) == header_len);
@@ -2876,14 +2880,32 @@ _dbus_message_loader_return_buffer (DBusMessageLoader  *loader,
          _dbus_verbose ("Loaded message %p\n", message);
        }
       else
-       break;
+        return TRUE;
     }
+
+  return TRUE;
+}
+
+/**
+ * Peeks at first loaded message, returns #NULL if no messages have
+ * been queued.
+ *
+ * @param loader the loader.
+ * @returns the next message, or #NULL if none.
+ */
+DBusMessage*
+_dbus_message_loader_peek_message (DBusMessageLoader *loader)
+{
+  if (loader->messages)
+    return loader->messages->data;
+  else
+    return NULL;
 }
 
 /**
  * Pops a loaded message (passing ownership of the message
  * to the caller). Returns #NULL if no messages have been
- * loaded.
+ * queued.
  *
  * @param loader the loader.
  * @returns the next message, or #NULL if none.
@@ -3153,6 +3175,9 @@ check_have_valid_message (DBusMessageLoader *loader)
 
   message = NULL;
   retval = FALSE;
+
+  if (!_dbus_message_loader_queue_messages (loader))
+    _dbus_assert_not_reached ("no memory to queue messages");
   
   if (_dbus_message_loader_get_is_corrupted (loader))
     {
@@ -3195,6 +3220,9 @@ check_invalid_message (DBusMessageLoader *loader)
   dbus_bool_t retval;
 
   retval = FALSE;
+
+  if (!_dbus_message_loader_queue_messages (loader))
+    _dbus_assert_not_reached ("no memory to queue messages");
   
   if (!_dbus_message_loader_get_is_corrupted (loader))
     {
@@ -3216,6 +3244,9 @@ check_incomplete_message (DBusMessageLoader *loader)
 
   message = NULL;
   retval = FALSE;
+
+  if (!_dbus_message_loader_queue_messages (loader))
+    _dbus_assert_not_reached ("no memory to queue messages");
   
   if (_dbus_message_loader_get_is_corrupted (loader))
     {
@@ -3242,6 +3273,9 @@ static dbus_bool_t
 check_loader_results (DBusMessageLoader      *loader,
                       DBusMessageValidity     validity)
 {
+  if (!_dbus_message_loader_queue_messages (loader))
+    _dbus_assert_not_reached ("no memory to queue messages");
+  
   switch (validity)
     {
     case _DBUS_MESSAGE_VALID:
@@ -3735,6 +3769,9 @@ _dbus_message_test (const char *test_data_dir)
   dbus_message_unref (message);
 
   /* Now pop back the message */
+  if (!_dbus_message_loader_queue_messages (loader))
+    _dbus_assert_not_reached ("no memory to queue messages");
+  
   if (_dbus_message_loader_get_is_corrupted (loader))
     _dbus_assert_not_reached ("message loader corrupted");
   
index 48703e1..79ed7ed 100644 (file)
@@ -151,7 +151,7 @@ _dbus_server_remove_watch  (DBusServer *server,
  * function on a watch that was not previously added.
  *
  * @param server the server.
- * @param timeout the timeout to toggle.
+ * @param watch the watch to toggle.
  * @param enabled whether to enable or disable
  */
 void
index 1a50dac..1bc3e20 100644 (file)
@@ -2395,6 +2395,7 @@ _dbus_string_validate_utf8  (const DBusString *str,
                              int               len)
 {
   const unsigned char *p;
+  const unsigned char *end;
   DBUS_CONST_STRING_PREAMBLE (str);
   _dbus_assert (start >= 0);
   _dbus_assert (start <= real->len);
@@ -2403,9 +2404,10 @@ _dbus_string_validate_utf8  (const DBusString *str,
   if (len > real->len - start)
     return FALSE;
   
-  p = real->str;
+  p = real->str + start;
+  end = p + len;
   
-  while (p - real->str < len && *p)
+  while (p < end)
     {
       int i, mask = 0, char_len;
       dbus_unichar_t result;
@@ -2416,20 +2418,20 @@ _dbus_string_validate_utf8  (const DBusString *str,
       if (char_len == -1)
         break;
 
-      /* check that the expected number of bytes exists in real->str */
-      if ((len - (p - real->str)) < char_len)
+      /* check that the expected number of bytes exists in the remaining length */
+      if ((end - p) < char_len)
         break;
         
       UTF8_GET (result, p, i, mask, char_len);
 
       if (UTF8_LENGTH (result) != char_len) /* Check for overlong UTF-8 */
-       break;
+        break;
 
       if (result == (dbus_unichar_t)-1)
         break;
 
       if (!UNICODE_VALID (result))
-       break;
+        break;
       
       p += char_len;
     }
@@ -2437,7 +2439,7 @@ _dbus_string_validate_utf8  (const DBusString *str,
   /* See that we covered the entire length if a length was
    * passed in
    */
-  if (p != (real->str + len))
+  if (p != end)
     return FALSE;
   else
     return TRUE;
index ed68658..9ea5ce1 100644 (file)
@@ -178,37 +178,6 @@ do_io_error (DBusTransport *transport)
   _dbus_transport_unref (transport);
 }
 
-static void
-queue_messages (DBusTransport *transport)
-{
-  DBusList *link;
-  
-  /* Queue any messages */
-  while ((link = _dbus_message_loader_pop_message_link (transport->loader)))
-    {
-      DBusMessage *message;
-
-      message = link->data;
-      
-      _dbus_verbose ("queueing received message %p\n", message);
-
-      _dbus_message_add_size_counter (message, transport->live_messages_size);
-
-      /* pass ownership of link and message ref to connection */
-      _dbus_connection_queue_received_message_link (transport->connection,
-                                                    link);
-    }
-
-  if (_dbus_message_loader_get_is_corrupted (transport->loader))
-    {
-      _dbus_verbose ("Corrupted message stream, disconnecting\n");
-      do_io_error (transport);
-    }
-
-  /* check read watch in case we've now exceeded max outstanding messages */
-  check_read_watch (transport);
-}
-
 /* return value is whether we successfully read any new data. */
 static dbus_bool_t
 read_data_into_auth (DBusTransport *transport)
@@ -398,8 +367,6 @@ recover_unused_bytes (DBusTransport *transport)
                                           orig_len);
     }
   
-  queue_messages (transport);
-
   return;
 
  nomem:
@@ -777,7 +744,8 @@ do_reading (DBusTransport *transport)
       
       total += bytes_read;      
 
-      queue_messages (transport);
+      if (_dbus_transport_queue_messages (transport) == DBUS_DISPATCH_NEED_MEMORY)
+        goto out;
       
       /* Try reading more data until we get EAGAIN and return, or
        * exceed max bytes per iteration.  If in blocking mode of
index 8c6c7f1..b2355ea 100644 (file)
@@ -507,6 +507,70 @@ _dbus_transport_do_iteration (DBusTransport  *transport,
 }
 
 /**
+ * Reports our current dispatch status (whether there's buffered
+ * data to be queued as messages, or not, or we need memory).
+ *
+ * @param transport the transport
+ * @returns current status
+ */
+DBusDispatchStatus
+_dbus_transport_get_dispatch_status (DBusTransport *transport)
+{
+  if (_dbus_counter_get_value (transport->live_messages_size) >= transport->max_live_messages_size)
+    return DBUS_DISPATCH_COMPLETE; /* complete for now */
+
+  if (!_dbus_message_loader_queue_messages (transport->loader))
+    return DBUS_DISPATCH_NEED_MEMORY;
+
+  if (_dbus_message_loader_peek_message (transport->loader) != NULL)
+    return DBUS_DISPATCH_DATA_REMAINS;
+  else
+    return DBUS_DISPATCH_COMPLETE;
+}
+
+/**
+ * Processes data we've read while handling a watch, potentially
+ * converting some of it to messages and queueing those messages on
+ * the connection.
+ *
+ * @param transport the transport
+ * @returns #TRUE if we had enough memory to queue all messages
+ */
+dbus_bool_t
+_dbus_transport_queue_messages (DBusTransport *transport)
+{
+  DBusDispatchStatus status;
+  
+  /* Queue any messages */
+  while ((status = _dbus_transport_get_dispatch_status (transport)) == DBUS_DISPATCH_DATA_REMAINS)
+    {
+      DBusMessage *message;
+      DBusList *link;
+
+      link = _dbus_message_loader_pop_message_link (transport->loader);
+      _dbus_assert (link != NULL);
+      
+      message = link->data;
+      
+      _dbus_verbose ("queueing received message %p\n", message);
+
+      _dbus_message_add_size_counter (message, transport->live_messages_size);
+
+      /* pass ownership of link and message ref to connection */
+      _dbus_connection_queue_received_message_link (transport->connection,
+                                                    link);
+    }
+
+  if (_dbus_message_loader_get_is_corrupted (transport->loader))
+    {
+      _dbus_verbose ("Corrupted message stream, disconnecting\n");
+      _dbus_transport_disconnect (transport);
+    }
+
+  return status != DBUS_DISPATCH_NEED_MEMORY;
+}
+
+/**
  * See dbus_connection_set_max_message_size().
  *
  * @param transport the transport
index 1f01788..ad3299c 100644 (file)
@@ -30,29 +30,31 @@ DBUS_BEGIN_DECLS;
 
 typedef struct DBusTransport DBusTransport;
 
-DBusTransport* _dbus_transport_open                       (const char     *address,
-                                                           DBusResultCode *result);
-void           _dbus_transport_ref                        (DBusTransport  *transport);
-void           _dbus_transport_unref                      (DBusTransport  *transport);
-void           _dbus_transport_disconnect                 (DBusTransport  *transport);
-dbus_bool_t    _dbus_transport_get_is_connected           (DBusTransport  *transport);
-dbus_bool_t    _dbus_transport_get_is_authenticated       (DBusTransport  *transport);
-void           _dbus_transport_handle_watch               (DBusTransport  *transport,
-                                                           DBusWatch      *watch,
-                                                           unsigned int    condition);
-dbus_bool_t    _dbus_transport_set_connection             (DBusTransport  *transport,
-                                                           DBusConnection *connection);
-void           _dbus_transport_messages_pending           (DBusTransport  *transport,
-                                                           int             queue_length);
-void           _dbus_transport_do_iteration               (DBusTransport  *transport,
-                                                           unsigned int    flags,
-                                                           int             timeout_milliseconds);
-void           _dbus_transport_set_max_message_size       (DBusTransport  *transport,
-                                                           long            size);
-long           _dbus_transport_get_max_message_size       (DBusTransport  *transport);
-void           _dbus_transport_set_max_live_messages_size (DBusTransport  *transport,
-                                                           long            size);
-long           _dbus_transport_get_max_live_messages_size (DBusTransport  *transport);
+DBusTransport*     _dbus_transport_open                       (const char     *address,
+                                                               DBusResultCode *result);
+void               _dbus_transport_ref                        (DBusTransport  *transport);
+void               _dbus_transport_unref                      (DBusTransport  *transport);
+void               _dbus_transport_disconnect                 (DBusTransport  *transport);
+dbus_bool_t        _dbus_transport_get_is_connected           (DBusTransport  *transport);
+dbus_bool_t        _dbus_transport_get_is_authenticated       (DBusTransport  *transport);
+void               _dbus_transport_handle_watch               (DBusTransport  *transport,
+                                                               DBusWatch      *watch,
+                                                               unsigned int    condition);
+dbus_bool_t        _dbus_transport_set_connection             (DBusTransport  *transport,
+                                                               DBusConnection *connection);
+void               _dbus_transport_messages_pending           (DBusTransport  *transport,
+                                                               int             queue_length);
+void               _dbus_transport_do_iteration               (DBusTransport  *transport,
+                                                               unsigned int    flags,
+                                                               int             timeout_milliseconds);
+DBusDispatchStatus _dbus_transport_get_dispatch_status        (DBusTransport  *transport);
+dbus_bool_t        _dbus_transport_queue_messages             (DBusTransport  *transport);
+void               _dbus_transport_set_max_message_size       (DBusTransport  *transport,
+                                                               long            size);
+long               _dbus_transport_get_max_message_size       (DBusTransport  *transport);
+void               _dbus_transport_set_max_live_messages_size (DBusTransport  *transport,
+                                                               long            size);
+long               _dbus_transport_get_max_live_messages_size (DBusTransport  *transport);
 
 
 DBUS_END_DECLS;
index 5eb75d1..b00320f 100644 (file)
@@ -63,47 +63,47 @@ static int connection_slot = -1;
 static GStaticMutex server_slot_lock = G_STATIC_MUTEX_INIT;
 static int server_slot = -1;
 
-static gboolean dbus_connection_prepare  (GSource     *source,
-                                          gint        *timeout);
-static gboolean dbus_connection_check    (GSource     *source);
-static gboolean dbus_connection_dispatch (GSource     *source,
-                                          GSourceFunc  callback,
-                                          gpointer     user_data);
-static gboolean dbus_server_prepare      (GSource     *source,
-                                          gint        *timeout);
-static gboolean dbus_server_check        (GSource     *source);
-static gboolean dbus_server_dispatch     (GSource     *source,
-                                          GSourceFunc  callback,
-                                          gpointer     user_data);
+static gboolean gsource_connection_prepare  (GSource     *source,
+                                             gint        *timeout);
+static gboolean gsource_connection_check    (GSource     *source);
+static gboolean gsource_connection_dispatch (GSource     *source,
+                                             GSourceFunc  callback,
+                                             gpointer     user_data);
+static gboolean gsource_server_prepare      (GSource     *source,
+                                             gint        *timeout);
+static gboolean gsource_server_check        (GSource     *source);
+static gboolean gsource_server_dispatch     (GSource     *source,
+                                             GSourceFunc  callback,
+                                             gpointer     user_data);
 
 static GSourceFuncs dbus_connection_funcs = {
-  dbus_connection_prepare,
-  dbus_connection_check,
-  dbus_connection_dispatch,
+  gsource_connection_prepare,
+  gsource_connection_check,
+  gsource_connection_dispatch,
   NULL
 };
 
 static GSourceFuncs dbus_server_funcs = {
-  dbus_server_prepare,
-  dbus_server_check,
-  dbus_server_dispatch,
+  gsource_server_prepare,
+  gsource_server_check,
+  gsource_server_dispatch,
   NULL
 };
 
 static gboolean
-dbus_connection_prepare (GSource *source,
-                        gint    *timeout)
+gsource_connection_prepare (GSource *source,
+                            gint    *timeout)
 {
   DBusConnection *connection = ((DBusGSource *)source)->connection_or_server;
   
   *timeout = -1;
 
-  return (dbus_connection_get_n_messages (connection) > 0);  
+  return (dbus_connection_get_dispatch_status (connection) == DBUS_DISPATCH_DATA_REMAINS);  
 }
 
 static gboolean
-dbus_server_prepare (GSource *source,
-                     gint    *timeout)
+gsource_server_prepare (GSource *source,
+                        gint    *timeout)
 {
   *timeout = -1;
 
@@ -132,13 +132,13 @@ dbus_gsource_check (GSource *source)
 }
 
 static gboolean
-dbus_connection_check (GSource *source)
+gsource_connection_check (GSource *source)
 {
   return dbus_gsource_check (source);
 }
 
 static gboolean
-dbus_server_check (GSource *source)
+gsource_server_check (GSource *source)
 {
   return dbus_gsource_check (source);
 }
@@ -192,9 +192,9 @@ dbus_gsource_dispatch (GSource     *source,
 }
 
 static gboolean
-dbus_connection_dispatch (GSource     *source,
-                         GSourceFunc  callback,
-                         gpointer     user_data)
+gsource_connection_dispatch (GSource     *source,
+                             GSourceFunc  callback,
+                             gpointer     user_data)
 {
   DBusGSource *dbus_source = (DBusGSource *)source;
   DBusConnection *connection = dbus_source->connection_or_server;
@@ -205,7 +205,7 @@ dbus_connection_dispatch (GSource     *source,
                          FALSE);
   
   /* Dispatch messages */
-  while (dbus_connection_dispatch_message (connection))
+  while (dbus_connection_dispatch (connection) == DBUS_DISPATCH_DATA_REMAINS)
     ;
 
    dbus_connection_unref (connection);
@@ -214,9 +214,9 @@ dbus_connection_dispatch (GSource     *source,
 }
 
 static gboolean
-dbus_server_dispatch (GSource     *source,
-                      GSourceFunc  callback,
-                      gpointer     user_data)
+gsource_server_dispatch (GSource     *source,
+                         GSourceFunc  callback,
+                         gpointer     user_data)
 {
   DBusGSource *dbus_source = (DBusGSource *)source;
   DBusServer *server = dbus_source->connection_or_server;