From: David Zeuthen Date: Wed, 30 Jun 2010 15:43:42 +0000 (-0400) Subject: Bug 623142 – Ensure ::new-connection runs before processing D-Bus messages X-Git-Tag: 2.25.11~121 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=038d03cd08bdb42e6f83f6041ec01732476e900b;p=platform%2Fupstream%2Fglib.git Bug 623142 – Ensure ::new-connection runs before processing D-Bus messages Without this guarantee, peer-to-peer connections are not very useful. However, with this guarantee it's possible to export objects in a handler for the GDBusServer::new-connection signal. There are two caveats with this patch - it won't work on message bus connections - we don't queue up messages to be written that can be addresses later if needed. https://bugzilla.gnome.org/show_bug.cgi?id=623142 Signed-off-by: David Zeuthen --- diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt index b4f3dd2..420111b 100644 --- a/docs/reference/gio/gio-sections.txt +++ b/docs/reference/gio/gio-sections.txt @@ -2390,6 +2390,7 @@ g_dbus_connection_new_sync g_dbus_connection_new_for_address g_dbus_connection_new_for_address_finish g_dbus_connection_new_for_address_sync +g_dbus_connection_start_message_processing GDBusCapabilityFlags g_dbus_connection_close g_dbus_connection_is_closed diff --git a/gio/gdbusconnection.c b/gio/gdbusconnection.c index 98f705e..189541b 100644 --- a/gio/gdbusconnection.c +++ b/gio/gdbusconnection.c @@ -830,6 +830,23 @@ g_dbus_connection_get_stream (GDBusConnection *connection) return connection->priv->stream; } +/** + * g_dbus_connection_start_message_processing: + * @connection: A #GDBusConnection. + * + * If @connection was created with + * %G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING, this method + * starts processing messages. Does nothing on if @connection wasn't + * created with this flag or if the method has already been called. + * + * Since: 2.26 + */ +void +g_dbus_connection_start_message_processing (GDBusConnection *connection) +{ + g_return_if_fail (G_IS_DBUS_CONNECTION (connection)); + _g_dbus_worker_unfreeze (connection->priv->worker); +} /** * g_dbus_connection_is_closed: @@ -1877,16 +1894,27 @@ initable_init (GInitable *initable, connection->priv->worker = _g_dbus_worker_new (connection->priv->stream, connection->priv->capabilities, + (connection->priv->flags & G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING), on_worker_message_received, on_worker_message_about_to_be_sent, on_worker_closed, connection); - /* if a bus connection, invoke org.freedesktop.DBus.Hello - this is how we're getting a name */ + /* if a bus connection, call org.freedesktop.DBus.Hello - this is how we're getting a name */ if (connection->priv->flags & G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION) { GVariant *hello_result; + /* we could lift this restriction by adding code in gdbusprivate.c */ + if (connection->priv->flags & G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) + { + g_set_error_literal (&connection->priv->initialization_error, + G_IO_ERROR, + G_IO_ERROR_FAILED, + "Cannot use DELAY_MESSAGE_PROCESSING with MESSAGE_BUS_CONNECTION"); + goto out; + } + hello_result = g_dbus_connection_call_sync (connection, "org.freedesktop.DBus", /* name */ "/org/freedesktop/DBus", /* path */ diff --git a/gio/gdbusconnection.h b/gio/gdbusconnection.h index d80daf2..4f85159 100644 --- a/gio/gdbusconnection.h +++ b/gio/gdbusconnection.h @@ -128,6 +128,7 @@ GDBusConnection *g_dbus_connection_new_for_address_sync (const gchar /* ---------------------------------------------------------------------------------------------------- */ +void g_dbus_connection_start_message_processing (GDBusConnection *connection); gboolean g_dbus_connection_is_closed (GDBusConnection *connection); void g_dbus_connection_close (GDBusConnection *connection); GIOStream *g_dbus_connection_get_stream (GDBusConnection *connection); diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c index 3a48d1d..5208ad9 100644 --- a/gio/gdbusprivate.c +++ b/gio/gdbusprivate.c @@ -350,7 +350,16 @@ _g_dbus_shared_thread_unref (void) struct GDBusWorker { volatile gint ref_count; + gboolean stopped; + + /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently + * only affects messages received from the other peer (since GDBusServer is the + * only user) - we might want it to affect messages sent to the other peer too? + */ + gboolean frozen; + GQueue *received_messages_while_frozen; + GIOStream *stream; GDBusCapabilityFlags capabilities; GCancellable *cancellable; @@ -406,11 +415,13 @@ _g_dbus_worker_unref (GDBusWorker *worker) if (worker->read_fd_list != NULL) g_object_unref (worker->read_fd_list); + g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL); + g_queue_free (worker->received_messages_while_frozen); + g_mutex_free (worker->write_lock); - g_queue_foreach (worker->write_queue, - (GFunc) message_to_write_data_free, - NULL); + g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL); g_queue_free (worker->write_queue); + g_free (worker); } } @@ -443,6 +454,66 @@ _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker, return ret; } +/* can only be called from private thread with read-lock held - takes ownership of @message */ +static void +_g_dbus_worker_queue_or_deliver_received_message (GDBusWorker *worker, + GDBusMessage *message) +{ + if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0) + { + /* queue up */ + g_queue_push_tail (worker->received_messages_while_frozen, message); + } + else + { + /* not frozen, nor anything in queue */ + _g_dbus_worker_emit_message_received (worker, message); + g_object_unref (message); + } +} + +/* called in private thread shared by all GDBusConnection instances (without read-lock held) */ +static gboolean +unfreeze_in_idle_cb (gpointer user_data) +{ + GDBusWorker *worker = user_data; + GDBusMessage *message; + + g_mutex_lock (worker->read_lock); + if (worker->frozen) + { + while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL) + { + _g_dbus_worker_emit_message_received (worker, message); + g_object_unref (message); + } + worker->frozen = FALSE; + } + else + { + g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0); + } + g_mutex_unlock (worker->read_lock); + return FALSE; +} + +/* can be called from any thread */ +void +_g_dbus_worker_unfreeze (GDBusWorker *worker) +{ + GSource *idle_source; + idle_source = g_idle_source_new (); + g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); + g_source_set_callback (idle_source, + unfreeze_in_idle_cb, + _g_dbus_worker_ref (worker), + (GDestroyNotify) _g_dbus_worker_unref); + g_source_attach (idle_source, shared_thread_data->context); + g_source_unref (idle_source); +} + +/* ---------------------------------------------------------------------------------------------------- */ + static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker); /* called in private thread shared by all GDBusConnection instances (without read-lock held) */ @@ -639,8 +710,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, } /* yay, got a message, go deliver it */ - _g_dbus_worker_emit_message_received (worker, message); - g_object_unref (message); + _g_dbus_worker_queue_or_deliver_received_message (worker, message); /* start reading another message! */ worker->read_buffer_bytes_wanted = 0; @@ -952,6 +1022,7 @@ _g_dbus_worker_thread_begin_func (gpointer user_data) GDBusWorker * _g_dbus_worker_new (GIOStream *stream, GDBusCapabilityFlags capabilities, + gboolean initially_frozen, GDBusWorkerMessageReceivedCallback message_received_callback, GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback, GDBusWorkerDisconnectedCallback disconnected_callback, @@ -976,6 +1047,9 @@ _g_dbus_worker_new (GIOStream *stream, worker->capabilities = capabilities; worker->cancellable = g_cancellable_new (); + worker->frozen = initially_frozen; + worker->received_messages_while_frozen = g_queue_new (); + worker->write_lock = g_mutex_new (); worker->write_queue = g_queue_new (); diff --git a/gio/gdbusprivate.h b/gio/gdbusprivate.h index 0d9cb61..766bb98 100644 --- a/gio/gdbusprivate.h +++ b/gio/gdbusprivate.h @@ -53,6 +53,7 @@ typedef void (*GDBusWorkerDisconnectedCallback) (GDBusWorker *worker, */ GDBusWorker *_g_dbus_worker_new (GIOStream *stream, GDBusCapabilityFlags capabilities, + gboolean initially_frozen, GDBusWorkerMessageReceivedCallback message_received_callback, GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback, GDBusWorkerDisconnectedCallback disconnected_callback, @@ -67,6 +68,9 @@ void _g_dbus_worker_send_message (GDBusWorker *worker, /* can be called from any thread */ void _g_dbus_worker_stop (GDBusWorker *worker); +/* can be called from any thread */ +void _g_dbus_worker_unfreeze (GDBusWorker *worker); + /* ---------------------------------------------------------------------------------------------------- */ void _g_dbus_initialize (void); diff --git a/gio/gdbusserver.c b/gio/gdbusserver.c index 85adccc..5195ba0 100644 --- a/gio/gdbusserver.c +++ b/gio/gdbusserver.c @@ -367,6 +367,11 @@ g_dbus_server_class_init (GDBusServerClass *klass) * linkend="g-main-context-push-thread-default">thread-default main * loop of the thread that @server was constructed in. * + * You are guaranteed that signal handlers for this signal runs + * before incoming messages on @connection are processed. This means + * that it's suitable to call g_dbus_connection_register_object() or + * similar from the signal handler. + * * Since: 2.26 */ _signals[NEW_CONNECTION_SIGNAL] = g_signal_new ("new-connection", @@ -889,6 +894,7 @@ emit_new_connection_in_idle (gpointer user_data) _signals[NEW_CONNECTION_SIGNAL], 0, data->connection); + g_dbus_connection_start_message_processing (data->connection); g_object_unref (data->connection); return FALSE; @@ -925,7 +931,9 @@ on_run (GSocketService *service, goto out; } - connection_flags = G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_SERVER; + connection_flags = + G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_SERVER | + G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING; if (server->priv->flags & G_DBUS_SERVER_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS) connection_flags |= G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS; @@ -944,6 +952,7 @@ on_run (GSocketService *service, _signals[NEW_CONNECTION_SIGNAL], 0, connection); + g_dbus_connection_start_message_processing (connection); g_object_unref (connection); } else diff --git a/gio/gio.symbols b/gio/gio.symbols index ebc3d15..0dd8b79 100644 --- a/gio/gio.symbols +++ b/gio/gio.symbols @@ -1529,6 +1529,7 @@ g_dbus_connection_new_for_address g_dbus_connection_new_for_address_finish g_dbus_connection_new_for_address_sync g_dbus_connection_new_sync +g_dbus_connection_start_message_processing g_dbus_connection_get_capabilities g_dbus_connection_get_exit_on_close g_dbus_connection_get_guid diff --git a/gio/gioenums.h b/gio/gioenums.h index 9d1e126..feb5d03 100644 --- a/gio/gioenums.h +++ b/gio/gioenums.h @@ -969,6 +969,8 @@ typedef enum * method. * @G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION: Pass this flag if connecting to a peer that is a * message bus. This means that the Hello() method will be invoked as part of the connection setup. + * @G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING: If set, processing of D-Bus messages is + * delayed until g_dbus_connection_start_message_processing() is called. * * Flags used when creating a new #GDBusConnection. * @@ -979,7 +981,8 @@ typedef enum { G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_CLIENT = (1<<0), G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_SERVER = (1<<1), G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS = (1<<2), - G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION = (1<<3) + G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION = (1<<3), + G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING = (1<<4) } GDBusConnectionFlags; /** diff --git a/gio/tests/gdbus-peer.c b/gio/tests/gdbus-peer.c index 7d81372..71bad62 100644 --- a/gio/tests/gdbus-peer.c +++ b/gio/tests/gdbus-peer.c @@ -732,6 +732,192 @@ test_peer (void) /* ---------------------------------------------------------------------------------------------------- */ +typedef struct +{ + GDBusServer *server; + GMainContext *context; + GMainLoop *loop; + + GList *connections; +} DmpData; + +static void +dmp_data_free (DmpData *data) +{ + g_main_loop_unref (data->loop); + g_main_context_unref (data->context); + g_object_unref (data->server); + g_list_foreach (data->connections, (GFunc) g_object_unref, NULL); + g_list_free (data->connections); + g_free (data); +} + +static void +dmp_on_method_call (GDBusConnection *connection, + const gchar *sender, + const gchar *object_path, + const gchar *interface_name, + const gchar *method_name, + GVariant *parameters, + GDBusMethodInvocation *invocation, + gpointer user_data) +{ + //DmpData *data = user_data; + gint32 first; + gint32 second; + g_variant_get (parameters, + "(ii)", + &first, + &second); + g_dbus_method_invocation_return_value (invocation, + g_variant_new ("(i)", first + second)); +} + +static const GDBusInterfaceVTable dmp_interface_vtable = +{ + dmp_on_method_call, + NULL, /* get_property */ + NULL /* set_property */ +}; + + +/* Runs in thread we created GDBusServer in (since we didn't pass G_DBUS_SERVER_FLAGS_RUN_IN_THREAD) */ +static void +dmp_on_new_connection (GDBusServer *server, + GDBusConnection *connection, + gpointer user_data) +{ + DmpData *data = user_data; + GDBusNodeInfo *node; + GError *error; + + /* accept the connection */ + data->connections = g_list_prepend (data->connections, g_object_ref (connection)); + + error = NULL; + node = g_dbus_node_info_new_for_xml ("" + " " + " " + " " + " " + " " + " " + " " + "", + &error); + g_assert_no_error (error); + + /* sleep 100ms before exporting an object - this is to test that + * G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING really works + * (GDBusServer uses this feature). + */ + usleep (100 * 1000); + + /* export an object */ + error = NULL; + g_dbus_connection_register_object (connection, + "/dmp/test", + node->interfaces[0], + &dmp_interface_vtable, + data, + NULL, + &error); + //g_dbus_node_info_unref (node); +} + +static gpointer +dmp_thread_func (gpointer user_data) +{ + DmpData *data = user_data; + GError *error; + gchar *guid; + + data->context = g_main_context_new (); + g_main_context_push_thread_default (data->context); + + error = NULL; + guid = g_dbus_generate_guid (); + data->server = g_dbus_server_new_sync ("nonce-tcp:", + G_DBUS_SERVER_FLAGS_NONE, + guid, + NULL, /* GDBusAuthObserver */ + NULL, /* GCancellable */ + &error); + g_assert_no_error (error); + g_signal_connect (data->server, + "new-connection", + G_CALLBACK (dmp_on_new_connection), + data); + + g_dbus_server_start (data->server); + + data->loop = g_main_loop_new (data->context, FALSE); + g_main_loop_run (data->loop); + + g_main_context_pop_thread_default (data->context); + + g_free (guid); + return NULL; +} + +static void +delayed_message_processing (void) +{ + GError *error; + DmpData *data; + GThread *service_thread; + guint n; + + data = g_new0 (DmpData, 1); + + error = NULL; + service_thread = g_thread_create (dmp_thread_func, + data, + TRUE, + &error); + while (data->server == NULL || !g_dbus_server_is_active (data->server)) + g_thread_yield (); + + for (n = 0; n < 5; n++) + { + GDBusConnection *c; + GVariant *res; + gint32 val; + + error = NULL; + c = g_dbus_connection_new_for_address_sync (g_dbus_server_get_client_address (data->server), + G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_CLIENT, + NULL, /* GDBusAuthObserver */ + NULL, /* GCancellable */ + &error); + g_assert_no_error (error); + + error = NULL; + res = g_dbus_connection_call_sync (c, + NULL, /* bus name */ + "/dmp/test", + "org.gtk.GDBus.DmpInterface", + "AddPair", + g_variant_new ("(ii)", 2, n), + G_VARIANT_TYPE ("(i)"), + G_DBUS_CALL_FLAGS_NONE, + -1, /* timeout_msec */ + NULL, /* GCancellable */ + &error); + g_assert_no_error (error); + g_variant_get (res, "(i)", &val); + g_assert_cmpint (val, ==, 2 + n); + g_variant_unref (res); + g_object_unref (c); + } + + g_main_loop_quit (data->loop); + g_thread_join (service_thread); + dmp_data_free (data); +} + +/* ---------------------------------------------------------------------------------------------------- */ + int main (int argc, char *argv[]) @@ -753,6 +939,7 @@ main (int argc, loop = g_main_loop_new (NULL, FALSE); g_test_add_func ("/gdbus/peer-to-peer", test_peer); + g_test_add_func ("/gdbus/delayed-message-processing", delayed_message_processing); ret = g_test_run();