+2003-02-26 Alexander Larsson <alexl@redhat.com>
+
+ * configure.in:
+ Set DBUS_GLIB_THREADS_LIBS for apps using gthread-2.0
+
+ * dbus/dbus-connection.c:
+ * dbus/dbus-connection.h:
+ Fix _dbus_connection_acquire_io_path and _dbus_connection_acquire_dispatch.
+ Add dbus_connection_set_wakeup_main_function and use it when queueing
+ incoming and outgoing messages.
+
+
+ * dbus/dbus-dataslot.c:
+ Threadsafe usage of DBusDataSlotAllocator
+
+ * dbus/dbus-message.c: (dbus_message_get_args_iter):
+ dbus_new can fail.
+
+ * dbus/dbus-server-unix.c:
+ Add todo comment
+
+ * glib/dbus-gmain.c:
+ Implement the new wakeup functions for glib.
+
+ * glib/Makefile.am:
+ * glib/test-thread-client.c:
+ * glib/test-thread-server.c:
+ * glib/test-thread.h:
+ Initial cut at some thread test code. Not really done yet.
+
2003-02-26 Havoc Pennington <hp@pobox.com>
* dbus/dbus-connection.c
# Glib detection
PKG_CHECK_MODULES(DBUS_GLIB, glib-2.0, have_glib=yes, have_glib=no)
+PKG_CHECK_MODULES(DBUS_GLIB_THREADS, glib-2.0 gthread-2.0)
if test x$have_glib = xno ; then
AC_MSG_WARN([GLib development libraries not found])
dnl GLib flags
AC_SUBST(DBUS_GLIB_CFLAGS)
AC_SUBST(DBUS_GLIB_LIBS)
+AC_SUBST(DBUS_GLIB_THREADS_LIBS)
# Qt detection
have_qt=no
int client_serial; /**< Client serial. Increments each time a message is sent */
DBusList *disconnect_message_link; /**< Preallocated list node for queueing the disconnection message */
+
+ DBusWakeupMainFunction wakeup_main_function; /**< Function to wake up the mainloop */
+ void *wakeup_main_data; /**< Application data for wakeup_main_function */
+ DBusFreeFunction free_wakeup_main_data; /**< free wakeup_main_data */
};
typedef struct
dbus_mutex_unlock (connection->mutex);
}
+/**
+ * Wakes up the main loop if it is sleeping
+ * Needed if we're e.g. queueing outgoing messages
+ * on a thread while the mainloop sleeps.
+ *
+ * @param connection the connection.
+ */
+static void
+_dbus_connection_wakeup_mainloop (DBusConnection *connection)
+{
+ if (connection->wakeup_main_function)
+ (*connection->wakeup_main_function) (connection->wakeup_main_data);
+}
/**
* Adds a message to the incoming message queue, returning #FALSE
dbus_message_ref (message);
connection->n_incoming += 1;
+ _dbus_connection_wakeup_mainloop (connection);
+
_dbus_verbose ("Incoming message %p added to queue, %d incoming\n",
message, connection->n_incoming);
connection->n_incoming += 1;
+ _dbus_connection_wakeup_mainloop (connection);
+
_dbus_verbose ("Incoming synthesized message %p added to queue, %d incoming\n",
link->data, connection->n_incoming);
}
int timeout_milliseconds)
{
dbus_bool_t res = TRUE;
- if (timeout_milliseconds != -1)
- res = dbus_condvar_wait_timeout (connection->io_path_cond,
- connection->mutex,
- timeout_milliseconds);
- else
- dbus_condvar_wait (connection->io_path_cond, connection->mutex);
+ if (connection->io_path_acquired)
+ {
+ if (timeout_milliseconds != -1)
+ res = dbus_condvar_wait_timeout (connection->io_path_cond,
+ connection->mutex,
+ timeout_milliseconds);
+ else
+ dbus_condvar_wait (connection->io_path_cond, connection->mutex);
+ }
+
if (res)
{
_dbus_assert (!connection->io_path_acquired);
DBusHashIter iter;
DBusList *link;
+ /* You have to disconnect the connection before unref:ing it. Otherwise
+ * you won't get the disconnected message.
+ */
_dbus_assert (!_dbus_transport_get_is_connected (connection->transport));
if (connection->connection_counter != NULL)
dbus_bool_t
dbus_connection_send_message (DBusConnection *connection,
DBusMessage *message,
- dbus_int32_t *client_serial,
+ dbus_int32_t *client_serial,
DBusResultCode *result)
{
if (connection->n_outgoing == 1)
_dbus_transport_messages_pending (connection->transport,
- connection->n_outgoing);
+ connection->n_outgoing);
+
+ _dbus_connection_wakeup_mainloop (connection);
dbus_mutex_unlock (connection->mutex);
static void
_dbus_connection_acquire_dispatch (DBusConnection *connection)
{
- dbus_condvar_wait (connection->dispatch_cond, connection->mutex);
+ if (connection->dispatch_acquired)
+ dbus_condvar_wait (connection->dispatch_cond, connection->mutex);
_dbus_assert (!connection->dispatch_acquired);
connection->dispatch_acquired = TRUE;
dbus_connection_unref (connection);
}
+/**
+ * Sets the mainloop wakeup function for the connection. Thi function is
+ * responsible for waking up the main loop (if its sleeping) when some some
+ * change has happened to the connection that the mainloop needs to reconsiders
+ * (e.g. a message has been queued for writing).
+ * When using Qt, this typically results in a call to QEventLoop::wakeUp().
+ * When using GLib, it would call g_main_context_wakeup().
+ *
+ *
+ * @param connection the connection.
+ * @param wakeup_main_function function to wake up the mainloop
+ * @param data data to pass wakeup_main_function
+ * @param free_data_function function to be called to free the data.
+ */
+void
+dbus_connection_set_wakeup_main_function (DBusConnection *connection,
+ DBusWakeupMainFunction wakeup_main_function,
+ void *data,
+ DBusFreeFunction free_data_function)
+{
+ void *old_data;
+ DBusFreeFunction old_free_data;
+
+ dbus_mutex_lock (connection->mutex);
+ old_data = connection->wakeup_main_data;
+ old_free_data = connection->free_wakeup_main_data;
+
+ connection->wakeup_main_function = wakeup_main_function;
+ connection->wakeup_main_data = data;
+ connection->free_wakeup_main_data = free_data_function;
+
+ dbus_mutex_unlock (connection->mutex);
+
+ /* Callback outside the lock */
+ if (old_free_data)
+ (*old_free_data) (old_data);
+}
+
/**
* Called to notify the connection when a previously-added watch
* is ready for reading or writing, or has an exception such
typedef void (* DBusRemoveWatchFunction) (DBusWatch *watch,
void *data);
+typedef void (* DBusWakeupMainFunction) (void *data);
typedef void (* DBusAddTimeoutFunction) (DBusTimeout *timeout,
void *data);
typedef void (* DBusRemoveTimeoutFunction) (DBusTimeout *timeout,
DBusResultCode *result);
-void dbus_connection_set_watch_functions (DBusConnection *connection,
- DBusAddWatchFunction add_function,
- DBusRemoveWatchFunction remove_function,
- void *data,
- DBusFreeFunction free_data_function);
-void dbus_connection_set_timeout_functions (DBusConnection *connection,
- DBusAddTimeoutFunction add_function,
- DBusRemoveTimeoutFunction remove_function,
- void *data,
- DBusFreeFunction free_data_function);
-void dbus_connection_handle_watch (DBusConnection *connection,
- DBusWatch *watch,
- unsigned int condition);
+void dbus_connection_set_watch_functions (DBusConnection *connection,
+ DBusAddWatchFunction add_function,
+ DBusRemoveWatchFunction remove_function,
+ void *data,
+ DBusFreeFunction free_data_function);
+void dbus_connection_set_timeout_functions (DBusConnection *connection,
+ DBusAddTimeoutFunction add_function,
+ DBusRemoveTimeoutFunction remove_function,
+ void *data,
+ DBusFreeFunction free_data_function);
+void dbus_connection_set_wakeup_main_function (DBusConnection *connection,
+ DBusWakeupMainFunction wakeup_main_function,
+ void *data,
+ DBusFreeFunction free_data_function);
+void dbus_connection_handle_watch (DBusConnection *connection,
+ DBusWatch *watch,
+ unsigned int condition);
DBusFreeFunction free_data_func,
DBusFreeFunction *old_free_func,
void **old_data)
-{
+{
+#ifndef DBUS_DISABLE_ASSERT
+ /* We need to take the allocator lock here, because the allocator could
+ * be e.g. realloc()ing allocated_slots. We avoid doing this if asserts
+ * are disabled, since then the asserts are empty.
+ */
+ if (!dbus_mutex_lock (allocator->lock))
+ return FALSE;
_dbus_assert (slot < allocator->n_allocated_slots);
_dbus_assert (allocator->allocated_slots[slot] == slot);
+ dbus_mutex_unlock (allocator->lock);
+#endif
if (slot >= list->n_slots)
{
DBusDataSlotList *list,
int slot)
{
+#ifndef DBUS_DISABLE_ASSERT
+ /* We need to take the allocator lock here, because the allocator could
+ * be e.g. realloc()ing allocated_slots. We avoid doing this if asserts
+ * are disabled, since then the asserts are empty.
+ */
+ if (!dbus_mutex_lock (allocator->lock))
+ return FALSE;
_dbus_assert (slot < allocator->n_allocated_slots);
_dbus_assert (allocator->allocated_slots[slot] == slot);
+ dbus_mutex_unlock (allocator->lock);
+#endif
if (slot >= list->n_slots)
return NULL;
iter = dbus_new (DBusMessageIter, 1);
- dbus_message_ref (message);
+ if (iter != NULL)
+ {
+ dbus_message_ref (message);
+
+ iter->refcount = 1;
+ iter->message = message;
+ iter->pos = 0;
+ }
- iter->refcount = 1;
- iter->message = message;
- iter->pos = 0;
-
return iter;
}
dbus_free (server);
}
+/**
+ * @todo unreffing the connection at the end may cause
+ * us to drop the last ref to the connection before
+ * disconnecting it. That is invalid.
+ */
static void
handle_new_client_fd (DBusServer *server,
int client_fd)
if DBUS_BUILD_TESTS
-noinst_PROGRAMS= test-dbus-glib
+noinst_PROGRAMS= test-dbus-glib test-thread-server test-thread-client
test_dbus_glib_SOURCES= \
- test-dbus-glib.c
+ test-dbus-glib.c
test_dbus_glib_LDADD= $(top_builddir)/glib/libdbus-glib-1.la
+test_thread_server_SOURCES= \
+ test-thread-server.c \
+ test-thread.h
+
+test_thread_server_LDADD= $(DBUS_GLIB_THREADS_LIBS) $(top_builddir)/glib/libdbus-glib-1.la
+
+test_thread_client_SOURCES= \
+ test-thread-client.c \
+ test-thread.h
+
+test_thread_client_LDADD= $(DBUS_GLIB_THREADS_LIBS) $(top_builddir)/glib/libdbus-glib-1.la
+
endif
g_source_destroy (source);
}
+static void
+wakeup_main (void *data)
+{
+ g_main_context_wakeup (NULL);
+}
+
+
/** @} */ /* End of GLib bindings internals */
/** @addtogroup DBusGLib
remove_timeout,
NULL, NULL);
+ dbus_connection_set_wakeup_main_function (connection,
+ wakeup_main,
+ NULL, NULL);
+
g_source_attach (source, NULL);
g_static_mutex_lock (&connection_slot_lock);
add_timeout,
remove_timeout,
NULL, NULL);
-
+
g_source_attach (source, NULL);
g_static_mutex_lock (&server_slot_lock);
--- /dev/null
+#include <glib.h>
+#include "dbus-glib.h"
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "test-thread.h"
+
+DBusConnection *connection;
+
+static gpointer
+thread_func (gpointer data)
+{
+ gint32 threadnr = GPOINTER_TO_INT (data);
+ guint32 counter = 0;
+ DBusMessage *message;
+ char *str;
+
+ while (1)
+ {
+ message = dbus_message_new (NULL, "org.freedesktop.ThreadTest");
+
+ if (!dbus_message_append_int32 (message, threadnr))
+ {
+ g_print ("thread %d: append threadnr failed\n", threadnr);
+ }
+
+ if (!dbus_message_append_uint32 (message, counter))
+ {
+ g_print ("thread %d: append counter (%d) failed\n", threadnr, counter);
+ }
+
+ str = g_strdup_printf ("Thread %d-%d\n", threadnr, counter);
+ if (!dbus_message_append_string (message, str))
+ {
+ g_print ("thread %d: append string (%s) failed\n", threadnr, str);
+ }
+ g_free (str);
+
+ if (!dbus_connection_send_message (connection,
+ message,
+ NULL, NULL))
+ {
+ g_print ("thread %d: send message failerd\n", threadnr);
+ }
+ dbus_message_unref (message);
+
+ counter ++;
+ }
+
+ return NULL;
+}
+
+int
+main (int argc, char *argv[])
+{
+ GMainLoop *loop;
+ DBusResultCode result;
+ int i;
+
+ g_thread_init (NULL);
+ dbus_gthread_init ();
+
+ if(argc < 2)
+ {
+ g_error("Need an address as argv[1]\n");
+ return 1;
+ }
+
+ connection = dbus_connection_open (argv[1], &result);
+ if (connection == NULL)
+ {
+ g_printerr ("could not open connection\n");
+ return 1;
+ }
+
+ dbus_connection_setup_with_g_main (connection);
+
+ for (i = 0; i < N_TEST_THREADS; i++)
+ {
+ g_thread_create (thread_func, GINT_TO_POINTER (i), FALSE, NULL);
+ }
+
+ loop = g_main_loop_new (NULL, FALSE);
+ g_main_run (loop);
+
+ return 0;
+}
+
--- /dev/null
+#include <glib.h>
+#include "dbus-glib.h"
+#include <stdio.h>
+#include <string.h>
+
+#include "test-thread.h"
+
+typedef struct {
+ guint32 counters[N_TEST_THREADS];
+} ThreadTestData;
+
+static ThreadTestData *
+thread_test_data_new (void)
+{
+ ThreadTestData *data;
+
+ data = g_new0 (ThreadTestData, 1);
+
+ return data;
+}
+
+static void
+thread_test_data_free (ThreadTestData *data)
+{
+ g_free (data);
+}
+
+static DBusMessageHandler *disconnect_handler;
+static DBusMessageHandler *filter_handler;
+static int handler_slot;
+
+static DBusHandlerResult
+handle_test_message (DBusMessageHandler *handler,
+ DBusConnection *connection,
+ DBusMessage *message,
+ void *user_data)
+{
+ ThreadTestData *data = user_data;
+ DBusMessageIter *iter;
+ gint32 threadnr;
+ guint32 counter;
+ char *str, *expected_str;
+ GString *counter_str;
+ int i;
+
+ iter = dbus_message_get_args_iter (message);
+ g_assert (iter != NULL);
+
+ if (dbus_message_iter_get_arg_type (iter) != DBUS_TYPE_INT32)
+ {
+ g_print ("First arg not right type\n");
+ goto out;
+ }
+ threadnr = dbus_message_iter_get_int32 (iter);
+ if (threadnr < 0 || threadnr >= N_TEST_THREADS)
+ {
+ g_print ("Invalid thread nr\n");
+ goto out;
+ }
+
+ if (! dbus_message_iter_next (iter))
+ {
+ g_print ("Couldn't get second arg\n");
+ goto out;
+ }
+
+ if (dbus_message_iter_get_arg_type (iter) != DBUS_TYPE_UINT32)
+ {
+ g_print ("Second arg not right type\n");
+ goto out;
+ }
+
+ counter = dbus_message_iter_get_uint32 (iter);
+
+ if (counter != data->counters[threadnr])
+ {
+ g_print ("Thread %d, counter %d, expected %d\n", threadnr, counter, data->counters[threadnr]);
+ goto out;
+ }
+ data->counters[threadnr]++;
+
+ if (! dbus_message_iter_next (iter))
+ {
+ g_print ("Couldn't get third arg\n");
+ goto out;
+ }
+
+ if (dbus_message_iter_get_arg_type (iter) != DBUS_TYPE_STRING)
+ {
+ g_print ("Third arg not right type\n");
+ goto out;
+ }
+
+ str = dbus_message_iter_get_string (iter);
+
+ if (str == NULL)
+ {
+ g_print ("No third arg\n");
+ goto out;
+ }
+
+ expected_str = g_strdup_printf ("Thread %d-%d\n", threadnr, counter);
+ if (strcmp (expected_str, str) != 0)
+ {
+ g_print ("Wrong string '%s', expected '%s'\n", str, expected_str);
+ goto out;
+ }
+ g_free (str);
+ g_free (expected_str);
+
+ if (dbus_message_iter_next (iter))
+ {
+ g_print ("Extra args on end of message\n");
+ goto out;
+ }
+
+ dbus_connection_flush (connection);
+
+ counter_str = g_string_new ("");
+ for (i = 0; i < N_TEST_THREADS; i++)
+ {
+ g_string_append_printf (counter_str, "%d ", data->counters[i]);
+ }
+ g_print ("%s\r", counter_str->str);
+ g_string_free (counter_str, TRUE);
+
+ out:
+ return DBUS_HANDLER_RESULT_ALLOW_MORE_HANDLERS;
+}
+
+static DBusHandlerResult
+handle_filter (DBusMessageHandler *handler,
+ DBusConnection *connection,
+ DBusMessage *message,
+ void *user_data)
+{
+ return DBUS_HANDLER_RESULT_ALLOW_MORE_HANDLERS;
+}
+
+static DBusHandlerResult
+handle_disconnect (DBusMessageHandler *handler,
+ DBusConnection *connection,
+ DBusMessage *message,
+ void *user_data)
+{
+ g_print ("connection disconnected\n");
+ dbus_connection_unref (connection);
+
+ return DBUS_HANDLER_RESULT_ALLOW_MORE_HANDLERS;
+}
+
+
+static void
+new_connection_callback (DBusServer *server,
+ DBusConnection *new_connection,
+ void *user_data)
+{
+ const char *test_messages[] = { "org.freedesktop.ThreadTest" };
+ const char *disconnect_messages[] = { "org.freedesktop.Local.Disconnect" };
+ DBusMessageHandler *test_message_handler;
+ ThreadTestData * data;
+
+ g_print ("new_connection_callback\n");
+
+ dbus_connection_ref (new_connection);
+ dbus_connection_setup_with_g_main (new_connection);
+
+ data = thread_test_data_new ();
+
+ test_message_handler =
+ dbus_message_handler_new (handle_test_message,
+ data, (DBusFreeFunction)thread_test_data_free);
+
+ if (!dbus_connection_register_handler (new_connection,
+ test_message_handler,
+ test_messages, 1))
+ goto nomem;
+
+ if (!dbus_connection_set_data (new_connection,
+ handler_slot,
+ test_message_handler,
+ (DBusFreeFunction)dbus_message_handler_unref))
+ goto nomem;
+
+ if (!dbus_connection_register_handler (new_connection,
+ disconnect_handler,
+ disconnect_messages, 1))
+ goto nomem;
+
+ if (!dbus_connection_add_filter (new_connection,
+ filter_handler))
+ goto nomem;
+
+ return;
+
+ nomem:
+ g_error ("no memory to setup new connection");
+}
+
+int
+main (int argc, char *argv[])
+{
+ GMainLoop *loop;
+ DBusServer *server;
+ DBusResultCode result;
+
+ g_thread_init (NULL);
+ dbus_gthread_init ();
+
+ if (argc < 2)
+ {
+ fprintf (stderr, "Give the server address as an argument\n");
+ return 1;
+ }
+
+ server = dbus_server_listen (argv[1], &result);
+ if (server == NULL)
+ {
+ fprintf (stderr, "Failed to start server on %s: %s\n",
+ argv[1], dbus_result_to_string (result));
+ return 1;
+ }
+
+ handler_slot = dbus_connection_allocate_data_slot ();
+
+ filter_handler =
+ dbus_message_handler_new (handle_filter, NULL, NULL);
+ if (filter_handler == NULL)
+ g_error ("no memory for handler");
+
+ disconnect_handler =
+ dbus_message_handler_new (handle_disconnect, NULL, NULL);
+ if (disconnect_handler == NULL)
+ g_error ("no memory for handler");
+
+ dbus_server_set_new_connection_function (server,
+ new_connection_callback,
+ NULL, NULL);
+
+ dbus_server_setup_with_g_main (server);
+
+ loop = g_main_loop_new (NULL, FALSE);
+ g_main_run (loop);
+
+ return 0;
+}
--- /dev/null
+#define N_TEST_THREADS 5