2003-02-16 Alexander Larsson <alexl@redhat.com>
authorAlexander Larsson <alexl@redhat.com>
Sun, 16 Feb 2003 15:18:35 +0000 (15:18 +0000)
committerAlexander Larsson <alexl@redhat.com>
Sun, 16 Feb 2003 15:18:35 +0000 (15:18 +0000)
* dbus/dbus-connection.c:
Implement sent_message_with_reply. (with_reply_and block is still
busted).
Made dispatch_message not lose message if OOM.

* dbus/dbus-errors.h:
Add NoReply error (for reply timeouts).

ChangeLog
dbus/dbus-connection.c
dbus/dbus-errors.h

index 29dcd4e..5f2710b 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,15 @@
 2003-02-16  Alexander Larsson  <alexl@redhat.com>
 
+       * dbus/dbus-connection.c:
+       Implement sent_message_with_reply. (with_reply_and block is still
+       busted).
+       Made dispatch_message not lose message if OOM.
+       
+       * dbus/dbus-errors.h:
+       Add NoReply error (for reply timeouts).
+
+2003-02-16  Alexander Larsson  <alexl@redhat.com>
+
        * dbus/dbus-hash.c (_dbus_hash_table_unref):
        Actually free keys and values when destroying hashtable.
 
index f97a5e1..bedeab4 100644 (file)
@@ -110,13 +110,33 @@ struct DBusConnection
   DBusDataSlot *data_slots;        /**< Data slots */
   int           n_slots; /**< Slots allocated so far. */
 
+  DBusHashTable *pending_replies;  /**< Hash of message serials and their message handlers. */  
   DBusCounter *connection_counter; /**< Counter that we decrement when finalized */
   
   int client_serial;            /**< Client serial. Increments each time a message is sent  */
   DBusList *disconnect_message_link;
 };
 
+typedef struct
+{
+  DBusConnection *connection;
+  DBusMessageHandler *handler;
+  DBusTimeout *timeout;
+  int serial;
+
+  DBusList *timeout_link; /* Preallocated timeout response */
+  
+  dbus_bool_t timeout_added;
+  dbus_bool_t connection_added;
+} ReplyHandlerData;
+
+static void reply_handler_data_free (ReplyHandlerData *data);
+
 static void _dbus_connection_free_data_slots_nolock (DBusConnection *connection);
+static void _dbus_connection_remove_timeout_locked (DBusConnection *connection,
+                                                   DBusTimeout    *timeout);
+
+
 
 /**
  * Adds a message to the incoming message queue, returning #FALSE
@@ -130,11 +150,29 @@ dbus_bool_t
 _dbus_connection_queue_received_message (DBusConnection *connection,
                                          DBusMessage    *message)
 {
+  ReplyHandlerData *reply_handler_data;
+  dbus_int32_t reply_serial;
+  
   _dbus_assert (_dbus_transport_get_is_authenticated (connection->transport));
   
   if (!_dbus_list_append (&connection->incoming_messages,
                           message))
     return FALSE;
+
+  /* If this is a reply we're waiting on, remove timeout for it */
+  reply_serial = _dbus_message_get_reply_serial (message);
+  if (reply_serial != -1)
+    {
+      reply_handler_data = _dbus_hash_table_lookup_int (connection->pending_replies,
+                                                       reply_serial);
+      if (reply_handler_data != NULL)
+       {
+         if (reply_handler_data->timeout_added)
+           _dbus_connection_remove_timeout_locked (connection,
+                                                   reply_handler_data->timeout);
+         reply_handler_data->timeout_added = FALSE;
+       }
+    }
   
   dbus_message_ref (message);
   connection->n_incoming += 1;
@@ -296,6 +334,16 @@ _dbus_connection_remove_timeout (DBusConnection *connection,
                                       timeout);
 }
 
+static void
+_dbus_connection_remove_timeout_locked (DBusConnection *connection,
+                                       DBusTimeout    *timeout)
+{
+    dbus_mutex_lock (connection->mutex);
+    _dbus_connection_remove_timeout (connection, timeout);
+    dbus_mutex_unlock (connection->mutex);
+}
+
+
 /**
  * Tells the connection that the transport has been disconnected.
  * Results in posting a disconnect message on the incoming message
@@ -422,17 +470,18 @@ _dbus_connection_new_for_transport (DBusTransport *transport)
   DBusConnection *connection;
   DBusWatchList *watch_list;
   DBusTimeoutList *timeout_list;
-  DBusHashTable *handler_table;
+  DBusHashTable *handler_table, *pending_replies;
   DBusMutex *mutex;
   DBusCondVar *message_returned_cond;
   DBusCondVar *dispatch_cond;
   DBusCondVar *io_path_cond;
   DBusList *disconnect_link;
   DBusMessage *disconnect_message;
-  
+
   watch_list = NULL;
   connection = NULL;
   handler_table = NULL;
+  pending_replies = NULL;
   timeout_list = NULL;
   mutex = NULL;
   message_returned_cond = NULL;
@@ -454,6 +503,12 @@ _dbus_connection_new_for_transport (DBusTransport *transport)
                           dbus_free, NULL);
   if (handler_table == NULL)
     goto error;
+
+  pending_replies =
+    _dbus_hash_table_new (DBUS_HASH_INT,
+                         NULL, (DBusFreeFunction)reply_handler_data_free);
+  if (pending_replies == NULL)
+    goto error;
   
   connection = dbus_new0 (DBusConnection, 1);
   if (connection == NULL)
@@ -495,6 +550,7 @@ _dbus_connection_new_for_transport (DBusTransport *transport)
   connection->watches = watch_list;
   connection->timeouts = timeout_list;
   connection->handler_table = handler_table;
+  connection->pending_replies = pending_replies;
   connection->filter_list = NULL;
 
   connection->data_slots = NULL;
@@ -532,6 +588,9 @@ _dbus_connection_new_for_transport (DBusTransport *transport)
 
   if (handler_table)
     _dbus_hash_table_unref (handler_table);
+
+  if (pending_replies)
+    _dbus_hash_table_unref (pending_replies);
   
   if (watch_list)
     _dbus_watch_list_free (watch_list);
@@ -738,9 +797,12 @@ _dbus_connection_last_unref (DBusConnection *connection)
       
       link = next;
     }
-  
+
   _dbus_hash_table_unref (connection->handler_table);
   connection->handler_table = NULL;
+
+  _dbus_hash_table_unref (connection->pending_replies);
+  connection->pending_replies = NULL;
   
   _dbus_list_clear (&connection->filter_list);
   
@@ -919,6 +981,54 @@ dbus_connection_send_message (DBusConnection *connection,
   return TRUE;
 }
 
+static void
+reply_handler_timeout (void *data)
+{
+  DBusConnection *connection;
+  ReplyHandlerData *reply_handler_data = data;
+
+  connection = reply_handler_data->connection;
+  
+  dbus_mutex_lock (connection->mutex);
+  if (reply_handler_data->timeout_link)
+    {
+      _dbus_connection_queue_synthesized_message_link (connection,
+                                                      reply_handler_data->timeout_link);
+      reply_handler_data->timeout_link = NULL;
+    }
+
+  _dbus_connection_remove_timeout (connection,
+                                  reply_handler_data->timeout);
+  reply_handler_data->timeout_added = FALSE;
+  
+  dbus_mutex_unlock (connection->mutex);
+}
+
+static void
+reply_handler_data_free (ReplyHandlerData *data)
+{
+  if (!data)
+    return;
+
+  if (data->timeout_added)
+    _dbus_connection_remove_timeout_locked (data->connection,
+                                           data->timeout);
+
+  if (data->connection_added)
+    _dbus_message_handler_remove_connection (data->handler,
+                                            data->connection);
+
+  if (data->timeout_link)
+    {
+      dbus_message_unref ((DBusMessage *)data->timeout_link->data);
+      _dbus_list_free_link (data->timeout_link);
+    }
+  
+  dbus_message_handler_unref (data->handler);
+  
+  dbus_free (data);
+}
+
 /**
  * Queues a message to send, as with dbus_connection_send_message(),
  * but also sets up a DBusMessageHandler to receive a reply to the
@@ -934,8 +1044,8 @@ dbus_connection_send_message (DBusConnection *connection,
  * message as a reply, after a reply has been seen the handler is
  * removed. If a filter filters out the reply before the handler sees
  * it, the handler is not removed but the timeout will immediately
- * fire again. If a filter was dumb and kept removing the timeout
- * reply then we'd get in an infinite loop.
+ * fire. If a filter was dumb and removed the timeout reply then
+ * the reply is lost (this will give a runtime warning).
  * 
  * If #NULL is passed for the reply_handler, the timeout reply will
  * still be generated and placed into the message queue, but no
@@ -969,8 +1079,120 @@ dbus_connection_send_message_with_reply (DBusConnection     *connection,
                                          int                 timeout_milliseconds,
                                          DBusResultCode     *result)
 {
-  /* FIXME */
-  return dbus_connection_send_message (connection, message, NULL, result);
+  DBusTimeout *timeout;
+  ReplyHandlerData *data;
+  DBusMessage *reply;
+  DBusList *reply_link;
+  dbus_int32_t serial = -1;
+  
+  if (timeout_milliseconds == -1)
+    timeout_milliseconds = DEFAULT_TIMEOUT_VALUE;
+
+  data = dbus_new0 (ReplyHandlerData, 1);
+
+  if (!data)
+    {
+      dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
+      return FALSE;
+    }
+  
+  timeout = _dbus_timeout_new (timeout_milliseconds, reply_handler_timeout,
+                              data, NULL);
+
+  if (!timeout)
+    {
+      reply_handler_data_free (data);
+      dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
+      return FALSE;
+    }
+
+  dbus_mutex_lock (connection->mutex);
+  
+  /* Add timeout */
+  if (!_dbus_connection_add_timeout (connection, timeout))
+    {
+      reply_handler_data_free (data);
+      _dbus_timeout_unref (timeout);
+      dbus_mutex_unlock (connection->mutex);
+      
+      dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
+      return FALSE;
+    }
+
+  /* The connection now owns the reference to the timeout. */
+  _dbus_timeout_unref (timeout);
+  
+  data->timeout_added = TRUE;
+  data->timeout = timeout;
+  data->connection = connection;
+  
+  if (!_dbus_message_handler_add_connection (reply_handler, connection))
+    {
+      dbus_mutex_unlock (connection->mutex);
+      reply_handler_data_free (data);
+      
+      dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
+      return FALSE;
+    }
+  data->connection_added = TRUE;
+  
+  /* Assign a serial to the message */
+  if (_dbus_message_get_client_serial (message) == -1)
+    {
+      serial = _dbus_connection_get_next_client_serial (connection);
+      _dbus_message_set_client_serial (message, serial);
+    }
+
+  data->handler = reply_handler;
+  data->serial = serial;
+
+  dbus_message_handler_ref (reply_handler);
+
+  reply = dbus_message_new_error_reply (message, DBUS_ERROR_NO_REPLY,
+                                       "No reply within specified time");
+  if (!reply)
+    {
+      dbus_mutex_unlock (connection->mutex);
+      reply_handler_data_free (data);
+      
+      dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
+      return FALSE;
+    }
+
+  reply_link = _dbus_list_alloc_link (reply);
+  if (!reply)
+    {
+      dbus_mutex_unlock (connection->mutex);
+      dbus_message_unref (reply);
+      reply_handler_data_free (data);
+      
+      dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
+      return FALSE;
+    }
+
+  data->timeout_link = reply_link;
+  
+  /* Insert the serial in the pending replies hash. */
+  if (!_dbus_hash_table_insert_int (connection->pending_replies, serial, data))
+    {
+      dbus_set_result (result, DBUS_RESULT_NO_MEMORY);
+      dbus_mutex_unlock (connection->mutex);
+      reply_handler_data_free (data);
+      
+      return FALSE;
+    }
+
+  dbus_mutex_unlock (connection->mutex);
+  
+  if (!dbus_connection_send_message (connection, message, NULL, result))
+    {
+      /* This will free the handler data too */
+      _dbus_hash_table_remove_int (connection->pending_replies, serial);
+      return FALSE;
+    }
+
+  dbus_set_result (result, DBUS_RESULT_SUCCESS);  
+  return TRUE;
 }
 
 /**
@@ -1256,6 +1478,15 @@ _dbus_connection_release_dispatch (DBusConnection *connection)
   dbus_condvar_wake_one (connection->dispatch_cond);
 }
 
+static void
+_dbus_connection_failed_pop (DBusConnection *connection,
+                            DBusList *message_link)
+{
+  _dbus_list_prepend_link (&connection->incoming_messages,
+                          message_link);
+  connection->n_incoming += 1;
+}
+
 /**
  * Pops the first-received message from the current incoming message
  * queue, runs any handlers for it, then unrefs the message.
@@ -1266,11 +1497,19 @@ _dbus_connection_release_dispatch (DBusConnection *connection)
 dbus_bool_t
 dbus_connection_dispatch_message (DBusConnection *connection)
 {
+  DBusMessageHandler *handler;
   DBusMessage *message;
-  DBusList *link, *filter_list_copy;
+  DBusList *link, *filter_list_copy, *message_link;
   DBusHandlerResult result;
+  ReplyHandlerData *reply_handler_data;
   const char *name;
+  dbus_int32_t reply_serial;
 
+  /* Preallocate link so we can put the message back on failure */
+  message_link = _dbus_list_alloc_link (NULL);
+  if (message_link)
+    return FALSE;
+  
   dbus_mutex_lock (connection->mutex);
 
   /* We need to ref the connection since the callback could potentially
@@ -1292,13 +1531,20 @@ dbus_connection_dispatch_message (DBusConnection *connection)
       dbus_connection_unref (connection);
       return FALSE;
     }
-
+  
+  message_link->data = message;
+  
   result = DBUS_HANDLER_RESULT_ALLOW_MORE_HANDLERS;
 
+  reply_serial = _dbus_message_get_reply_serial (message);
+  reply_handler_data = _dbus_hash_table_lookup_int (connection->pending_replies,
+                                                   reply_serial);
+  
   if (!_dbus_list_copy (&connection->filter_list, &filter_list_copy))
     {
       _dbus_connection_release_dispatch (connection);
       dbus_mutex_unlock (connection->mutex);
+      _dbus_connection_failed_pop (connection, message_link);
       dbus_connection_unref (connection);
       return FALSE;
     }
@@ -1332,15 +1578,40 @@ dbus_connection_dispatch_message (DBusConnection *connection)
   _dbus_list_clear (&filter_list_copy);
   
   dbus_mutex_lock (connection->mutex);
+
+  /* Did a reply we were waiting on get filtered? */
+  if (reply_handler_data && result == DBUS_HANDLER_RESULT_REMOVE_MESSAGE)
+    {
+      /* Queue the timeout immediately! */
+      if (reply_handler_data->timeout_link)
+       {
+         _dbus_connection_queue_synthesized_message_link (connection,
+                                                          reply_handler_data->timeout_link);
+         reply_handler_data->timeout_link = NULL;
+       }
+      else
+       {
+         /* We already queued the timeout? Then it was filtered! */
+         _dbus_warn ("The timeout for the reply to %d was filtered\n", reply_serial);
+       }
+    }
   
   if (result == DBUS_HANDLER_RESULT_REMOVE_MESSAGE)
     goto out;
 
+  if (reply_handler_data)
+    {
+      dbus_mutex_unlock (connection->mutex);
+      result = _dbus_message_handler_handle_message (reply_handler_data->handler,
+                                                    connection, message);
+      reply_handler_data_free (reply_handler_data);
+      dbus_mutex_lock (connection->mutex);
+      goto out;
+    }
+  
   name = dbus_message_get_name (message);
   if (name != NULL)
     {
-      DBusMessageHandler *handler;
-      
       handler = _dbus_hash_table_lookup_string (connection->handler_table,
                                                 name);
       if (handler != NULL)
@@ -1359,6 +1630,7 @@ dbus_connection_dispatch_message (DBusConnection *connection)
  out:
   _dbus_connection_release_dispatch (connection);
   dbus_mutex_unlock (connection->mutex);
+  _dbus_list_free_link (message_link);
   dbus_connection_unref (connection);
   dbus_message_unref (message);
   
index fd861fe..6e83dae 100644 (file)
@@ -54,6 +54,7 @@ struct DBusError
 #define DBUS_ERROR_SPAWN_FAILED               "org.freedesktop.DBus.Error.Spawn.Failed"
 #define DBUS_ERROR_NO_MEMORY                  "org.freedesktop.DBus.Error.NoMemory"
 #define DBUS_ERROR_SERVICE_DOES_NOT_EXIST     "org.freedesktop.DBus.Error.ServiceDoesNotExist"
+#define DBUS_ERROR_NO_REPLY                   "org.freedesktop.DBus.Error.NoReply"
 
 typedef enum
 {