/* GDBus - GLib D-Bus Library
*
- * Copyright (C) 2008-2009 Red Hat, Inc.
+ * Copyright (C) 2008-2010 Red Hat, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
#include <stdlib.h>
#include <string.h>
-
-#ifdef G_OS_UNIX
-#include <gio/gunixconnection.h>
-#include <gio/gunixfdmessage.h>
-#include "gunixcredentialsmessage.h"
+#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include "gasyncresult.h"
#include "gsimpleasyncresult.h"
#include "ginputstream.h"
+#include "gmemoryinputstream.h"
#include "giostream.h"
#include "gsocketcontrolmessage.h"
+#include "gsocketconnection.h"
+#include "gsocketoutputstream.h"
#ifdef G_OS_UNIX
-#include <unistd.h>
#include "gunixfdmessage.h"
#include "gunixconnection.h"
+#include "gunixcredentialsmessage.h"
+#endif
+
+#ifdef G_OS_WIN32
+#include <windows.h>
#endif
#include "glibintl.h"
+static gboolean _g_dbus_worker_do_initial_read (gpointer data);
+
/* ---------------------------------------------------------------------------------------------------- */
-static gchar *
-hexdump (const gchar *data, gsize len, guint indent)
+gchar *
+_g_dbus_hexdump (const gchar *data, gsize len, guint indent)
{
guint n, m;
GString *ret;
g_object_unref (data->socket);
if (data->cancellable != NULL)
g_object_unref (data->cancellable);
+ g_object_unref (data->simple);
g_free (data);
}
else
{
g_assert (error != NULL);
- g_simple_async_result_set_from_error (data->simple, error);
- g_error_free (error);
+ g_simple_async_result_take_error (data->simple, error);
}
if (data->from_mainloop)
/* ---------------------------------------------------------------------------------------------------- */
-G_LOCK_DEFINE_STATIC (shared_thread_lock);
+/* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
+
+static GPtrArray *ensured_classes = NULL;
+
+static void
+ensure_type (GType gtype)
+{
+ g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
+}
+
+static void
+release_required_types (void)
+{
+ g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
+ g_ptr_array_unref (ensured_classes);
+ ensured_classes = NULL;
+}
+
+static void
+ensure_required_types (void)
+{
+ g_assert (ensured_classes == NULL);
+ ensured_classes = g_ptr_array_new ();
+ ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
+ ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
+}
+/* ---------------------------------------------------------------------------------------------------- */
typedef struct
{
- gint num_users;
+ volatile gint refcount;
GThread *thread;
GMainContext *context;
GMainLoop *loop;
} SharedThreadData;
-static SharedThreadData *shared_thread_data = NULL;
-
static gpointer
-shared_thread_func (gpointer data)
+gdbus_shared_thread_func (gpointer user_data)
{
- g_main_context_push_thread_default (shared_thread_data->context);
- g_main_loop_run (shared_thread_data->loop);
- g_main_context_pop_thread_default (shared_thread_data->context);
- return NULL;
-}
+ SharedThreadData *data = user_data;
-typedef void (*GDBusSharedThreadFunc) (gpointer user_data);
+ g_main_context_push_thread_default (data->context);
+ g_main_loop_run (data->loop);
+ g_main_context_pop_thread_default (data->context);
-typedef struct
-{
- GDBusSharedThreadFunc func;
- gpointer user_data;
- gboolean done;
-} CallerData;
+ release_required_types ();
-static gboolean
-invoke_caller (gpointer user_data)
-{
- CallerData *data = user_data;
- data->func (data->user_data);
- data->done = TRUE;
- return FALSE;
+ return NULL;
}
-static void
-_g_dbus_shared_thread_ref (GDBusSharedThreadFunc func,
- gpointer user_data)
-{
- GError *error;
- GSource *idle_source;
- CallerData *data;
+/* ---------------------------------------------------------------------------------------------------- */
- G_LOCK (shared_thread_lock);
+static SharedThreadData *
+_g_dbus_shared_thread_ref (void)
+{
+ static gsize shared_thread_data = 0;
+ GError *error = NULL;
+ SharedThreadData *ret;
- if (shared_thread_data != NULL)
+ if (g_once_init_enter (&shared_thread_data))
{
- shared_thread_data->num_users += 1;
- goto have_thread;
+ SharedThreadData *data;
+
+ /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
+ ensure_required_types ();
+
+ data = g_new0 (SharedThreadData, 1);
+ data->refcount = 0;
+
+ data->context = g_main_context_new ();
+ data->loop = g_main_loop_new (data->context, FALSE);
+ data->thread = g_thread_new ("gdbus",
+ gdbus_shared_thread_func,
+ data,
+ TRUE,
+ &error);
+ g_assert_no_error (error);
+ /* We can cast between gsize and gpointer safely */
+ g_once_init_leave (&shared_thread_data, (gsize) data);
}
- shared_thread_data = g_new0 (SharedThreadData, 1);
- shared_thread_data->num_users = 1;
-
- error = NULL;
- shared_thread_data->context = g_main_context_new ();
- shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE);
- shared_thread_data->thread = g_thread_create (shared_thread_func,
- NULL,
- TRUE,
- &error);
- g_assert_no_error (error);
-
- have_thread:
-
- data = g_new0 (CallerData, 1);
- data->func = func;
- data->user_data = user_data;
- data->done = FALSE;
-
- idle_source = g_idle_source_new ();
- g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
- g_source_set_callback (idle_source,
- invoke_caller,
- data,
- NULL);
- g_source_attach (idle_source, shared_thread_data->context);
- g_source_unref (idle_source);
-
- /* wait for the user code to run.. hmm.. probably use a condition variable instead */
- while (!data->done)
- g_thread_yield ();
-
- g_free (data);
-
- G_UNLOCK (shared_thread_lock);
+ ret = (SharedThreadData*) shared_thread_data;
+ g_atomic_int_inc (&ret->refcount);
+ return ret;
}
static void
-_g_dbus_shared_thread_unref (void)
+_g_dbus_shared_thread_unref (SharedThreadData *data)
{
/* TODO: actually destroy the shared thread here */
#if 0
- G_LOCK (shared_thread_lock);
- g_assert (shared_thread_data != NULL);
- shared_thread_data->num_users -= 1;
- if (shared_thread_data->num_users == 0)
- {
- g_main_loop_quit (shared_thread_data->loop);
- //g_thread_join (shared_thread_data->thread);
- g_main_loop_unref (shared_thread_data->loop);
- g_main_context_unref (shared_thread_data->context);
- g_free (shared_thread_data);
- shared_thread_data = NULL;
- G_UNLOCK (shared_thread_lock);
- }
- else
+ g_assert (data != NULL);
+ if (g_atomic_int_dec_and_test (&data->refcount))
{
- G_UNLOCK (shared_thread_lock);
+ g_main_loop_quit (data->loop);
+ //g_thread_join (data->thread);
+ g_main_loop_unref (data->loop);
+ g_main_context_unref (data->context);
}
#endif
}
struct GDBusWorker
{
volatile gint ref_count;
- gboolean stopped;
- GIOStream *stream;
+
+ SharedThreadData *shared_thread_data;
+
+ /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
+ volatile gint stopped;
+
+ /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
+ * only affects messages received from the other peer (since GDBusServer is the
+ * only user) - we might want it to affect messages sent to the other peer too?
+ */
+ gboolean frozen;
GDBusCapabilityFlags capabilities;
+ GQueue *received_messages_while_frozen;
+
+ GIOStream *stream;
GCancellable *cancellable;
GDBusWorkerMessageReceivedCallback message_received_callback;
+ GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
GDBusWorkerDisconnectedCallback disconnected_callback;
gpointer user_data;
- GThread *thread;
-
/* if not NULL, stream is GSocketConnection */
GSocket *socket;
/* used for reading */
- GMutex *read_lock;
+ GMutex read_lock;
gchar *read_buffer;
gsize read_buffer_allocated_size;
gsize read_buffer_cur_size;
GSocketControlMessage **read_ancillary_messages;
gint read_num_ancillary_messages;
+ /* TRUE if an async write, flush or close is pending.
+ * Only the worker thread may change its value, and only with the write_lock.
+ * Other threads may read its value when holding the write_lock.
+ * The worker thread may read its value at any time.
+ */
+ gboolean output_pending;
/* used for writing */
- GMutex *write_lock;
+ GMutex write_lock;
+ /* queue of MessageToWriteData, protected by write_lock */
GQueue *write_queue;
- gboolean write_is_pending;
+ /* protected by write_lock */
+ guint64 write_num_messages_written;
+ /* list of FlushData, protected by write_lock */
+ GList *write_pending_flushes;
+ /* list of CloseData, protected by write_lock */
+ GList *pending_close_attempts;
};
+static void _g_dbus_worker_unref (GDBusWorker *worker);
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+typedef struct
+{
+ GMutex mutex;
+ GCond cond;
+ guint64 number_to_wait_for;
+ GError *error;
+} FlushData;
+
struct _MessageToWriteData ;
typedef struct _MessageToWriteData MessageToWriteData;
static void message_to_write_data_free (MessageToWriteData *data);
+static void read_message_print_transport_debug (gssize bytes_read,
+ GDBusWorker *worker);
+
+static void write_message_print_transport_debug (gssize bytes_written,
+ MessageToWriteData *data);
+
+typedef struct {
+ GDBusWorker *worker;
+ GCancellable *cancellable;
+ GSimpleAsyncResult *result;
+} CloseData;
+
+static void close_data_free (CloseData *close_data)
+{
+ if (close_data->cancellable != NULL)
+ g_object_unref (close_data->cancellable);
+
+ if (close_data->result != NULL)
+ g_object_unref (close_data->result);
+
+ _g_dbus_worker_unref (close_data->worker);
+ g_slice_free (CloseData, close_data);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
static GDBusWorker *
_g_dbus_worker_ref (GDBusWorker *worker)
{
{
if (g_atomic_int_dec_and_test (&worker->ref_count))
{
- _g_dbus_shared_thread_unref ();
+ g_assert (worker->write_pending_flushes == NULL);
+
+ _g_dbus_shared_thread_unref (worker->shared_thread_data);
g_object_unref (worker->stream);
- g_mutex_free (worker->read_lock);
+ g_mutex_clear (&worker->read_lock);
g_object_unref (worker->cancellable);
if (worker->read_fd_list != NULL)
g_object_unref (worker->read_fd_list);
- g_mutex_free (worker->write_lock);
- g_queue_foreach (worker->write_queue,
- (GFunc) message_to_write_data_free,
- NULL);
+ g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
+ g_queue_free (worker->received_messages_while_frozen);
+
+ g_mutex_clear (&worker->write_lock);
+ g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
g_queue_free (worker->write_queue);
+
+ g_free (worker->read_buffer);
+
g_free (worker);
}
}
gboolean remote_peer_vanished,
GError *error)
{
- if (!worker->stopped)
+ if (!g_atomic_int_get (&worker->stopped))
worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
}
static void
-_g_dbus_worker_emit_message (GDBusWorker *worker,
- GDBusMessage *message)
+_g_dbus_worker_emit_message_received (GDBusWorker *worker,
+ GDBusMessage *message)
{
- if (!worker->stopped)
+ if (!g_atomic_int_get (&worker->stopped))
worker->message_received_callback (worker, message, worker->user_data);
}
+static GDBusMessage *
+_g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker,
+ GDBusMessage *message)
+{
+ GDBusMessage *ret;
+ if (!g_atomic_int_get (&worker->stopped))
+ ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
+ else
+ ret = message;
+ return ret;
+}
+
+/* can only be called from private thread with read-lock held - takes ownership of @message */
+static void
+_g_dbus_worker_queue_or_deliver_received_message (GDBusWorker *worker,
+ GDBusMessage *message)
+{
+ if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
+ {
+ /* queue up */
+ g_queue_push_tail (worker->received_messages_while_frozen, message);
+ }
+ else
+ {
+ /* not frozen, nor anything in queue */
+ _g_dbus_worker_emit_message_received (worker, message);
+ g_object_unref (message);
+ }
+}
+
+/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
+static gboolean
+unfreeze_in_idle_cb (gpointer user_data)
+{
+ GDBusWorker *worker = user_data;
+ GDBusMessage *message;
+
+ g_mutex_lock (&worker->read_lock);
+ if (worker->frozen)
+ {
+ while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
+ {
+ _g_dbus_worker_emit_message_received (worker, message);
+ g_object_unref (message);
+ }
+ worker->frozen = FALSE;
+ }
+ else
+ {
+ g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
+ }
+ g_mutex_unlock (&worker->read_lock);
+ return FALSE;
+}
+
+/* can be called from any thread */
+void
+_g_dbus_worker_unfreeze (GDBusWorker *worker)
+{
+ GSource *idle_source;
+ idle_source = g_idle_source_new ();
+ g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (idle_source,
+ unfreeze_in_idle_cb,
+ _g_dbus_worker_ref (worker),
+ (GDestroyNotify) _g_dbus_worker_unref);
+ g_source_attach (idle_source, worker->shared_thread_data->context);
+ g_source_unref (idle_source);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
GError *error;
gssize bytes_read;
- g_mutex_lock (worker->read_lock);
+ g_mutex_lock (&worker->read_lock);
/* If already stopped, don't even process the reply */
- if (worker->stopped)
+ if (g_atomic_int_get (&worker->stopped))
goto out;
error = NULL;
goto out;
}
+ read_message_print_transport_debug (bytes_read, worker);
+
worker->read_buffer_cur_size += bytes_read;
if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
{
message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
worker->read_buffer_cur_size,
+ worker->capabilities,
&error);
if (message == NULL)
{
+ gchar *s;
+ s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
+ g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
+ "The error is: %s\n"
+ "The payload is as follows:\n"
+ "%s\n",
+ worker->read_buffer_cur_size,
+ error->message,
+ s);
+ g_free (s);
_g_dbus_worker_emit_disconnected (worker, FALSE, error);
g_error_free (error);
goto out;
}
+#ifdef G_OS_UNIX
if (worker->read_fd_list != NULL)
{
g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
+ g_object_unref (worker->read_fd_list);
worker->read_fd_list = NULL;
}
+#endif
if (G_UNLIKELY (_g_dbus_debug_message ()))
{
gchar *s;
+ _g_dbus_debug_print_lock ();
g_print ("========================================================================\n"
"GDBus-debug:Message:\n"
" <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
s = g_dbus_message_print (message, 2);
g_print ("%s", s);
g_free (s);
- s = hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
- g_print ("%s\n", s);
- g_free (s);
+ if (G_UNLIKELY (_g_dbus_debug_payload ()))
+ {
+ s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
+ g_print ("%s\n", s);
+ g_free (s);
+ }
+ _g_dbus_debug_print_unlock ();
}
/* yay, got a message, go deliver it */
- _g_dbus_worker_emit_message (worker, message);
- g_object_unref (message);
+ _g_dbus_worker_queue_or_deliver_received_message (worker, message);
/* start reading another message! */
worker->read_buffer_bytes_wanted = 0;
}
out:
- g_mutex_unlock (worker->read_lock);
+ g_mutex_unlock (&worker->read_lock);
/* gives up the reference acquired when calling g_input_stream_read_async() */
_g_dbus_worker_unref (worker);
}
/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
-static void
-_g_dbus_worker_do_read (GDBusWorker *worker)
+static gboolean
+_g_dbus_worker_do_initial_read (gpointer data)
{
- g_mutex_lock (worker->read_lock);
+ GDBusWorker *worker = data;
+ g_mutex_lock (&worker->read_lock);
_g_dbus_worker_do_read_unlocked (worker);
- g_mutex_unlock (worker->read_lock);
+ g_mutex_unlock (&worker->read_lock);
+ return FALSE;
}
/* ---------------------------------------------------------------------------------------------------- */
struct _MessageToWriteData
{
+ GDBusWorker *worker;
GDBusMessage *message;
gchar *blob;
gsize blob_size;
+
+ gsize total_written;
+ GSimpleAsyncResult *simple;
+
};
static void
message_to_write_data_free (MessageToWriteData *data)
{
- g_object_unref (data->message);
+ _g_dbus_worker_unref (data->worker);
+ if (data->message)
+ g_object_unref (data->message);
g_free (data->blob);
g_free (data);
}
/* ---------------------------------------------------------------------------------------------------- */
-/* called in private thread shared by all GDBusConnection instances (with write-lock held) */
-static gboolean
-write_message (GDBusWorker *worker,
- MessageToWriteData *data,
- GError **error)
-{
- gboolean ret;
-
- g_return_val_if_fail (data->blob_size > 16, FALSE);
+static void write_message_continue_writing (MessageToWriteData *data);
- ret = FALSE;
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
+static void
+write_message_async_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ MessageToWriteData *data = user_data;
+ GSimpleAsyncResult *simple;
+ gssize bytes_written;
+ GError *error;
- /* First, the initial 16 bytes - special case UNIX sockets here
- * since it may involve writing an ancillary message with file
- * descriptors
+ /* Note: we can't access data->simple after calling g_async_result_complete () because the
+ * callback can free @data and we're not completing in idle. So use a copy of the pointer.
*/
-#ifdef G_OS_UNIX
- {
- GOutputVector vector;
- GSocketControlMessage *message;
- GUnixFDList *fd_list;
- gssize bytes_written;
-
- fd_list = g_dbus_message_get_unix_fd_list (data->message);
-
- message = NULL;
- if (fd_list != NULL)
- {
- if (!G_IS_UNIX_CONNECTION (worker->stream))
- {
- g_set_error (error,
- G_IO_ERROR,
- G_IO_ERROR_INVALID_ARGUMENT,
- "Tried sending a file descriptor on unsupported stream of type %s",
- g_type_name (G_TYPE_FROM_INSTANCE (worker->stream)));
- goto out;
- }
- else if (!(worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
- {
- g_set_error_literal (error,
- G_IO_ERROR,
- G_IO_ERROR_INVALID_ARGUMENT,
- "Tried sending a file descriptor but remote peer does not support this capability");
- goto out;
- }
- message = g_unix_fd_message_new_with_fd_list (fd_list);
- }
-
- vector.buffer = data->blob;
- vector.size = 16;
-
- bytes_written = g_socket_send_message (worker->socket,
- NULL, /* address */
- &vector,
- 1,
- message != NULL ? &message : NULL,
- message != NULL ? 1 : 0,
- G_SOCKET_MSG_NONE,
- worker->cancellable,
- error);
- if (bytes_written == -1)
- {
- g_prefix_error (error, _("Error writing first 16 bytes of message to socket: "));
- if (message != NULL)
- g_object_unref (message);
- goto out;
- }
- if (message != NULL)
- g_object_unref (message);
-
- if (bytes_written < 16)
- {
- /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary
- * messages are sent?
- */
- g_assert_not_reached ();
- }
- }
-#else
- /* write the first 16 bytes (guaranteed to return an error if everything can't be written) */
- if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
- (const gchar *) data->blob,
- 16,
- NULL, /* bytes_written */
- worker->cancellable, /* cancellable */
- error))
- goto out;
-#endif
+ simple = data->simple;
- /* Then write the rest of the message (guaranteed to return an error if everything can't be written) */
- if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
- (const gchar *) data->blob + 16,
- data->blob_size - 16,
- NULL, /* bytes_written */
- worker->cancellable, /* cancellable */
- error))
- goto out;
+ error = NULL;
+ bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
+ res,
+ &error);
+ if (bytes_written == -1)
+ {
+ g_simple_async_result_take_error (simple, error);
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
+ }
+ g_assert (bytes_written > 0); /* zero is never returned */
- ret = TRUE;
+ write_message_print_transport_debug (bytes_written, data);
- if (G_UNLIKELY (_g_dbus_debug_message ()))
+ data->total_written += bytes_written;
+ g_assert (data->total_written <= data->blob_size);
+ if (data->total_written == data->blob_size)
{
- gchar *s;
- g_print ("========================================================================\n"
- "GDBus-debug:Message:\n"
- " >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
- data->blob_size);
- s = g_dbus_message_print (data->message, 2);
- g_print ("%s", s);
- g_free (s);
- s = hexdump (data->blob, data->blob_size, 2);
- g_print ("%s\n", s);
- g_free (s);
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
}
+ write_message_continue_writing (data);
+
out:
- return ret;
+ ;
}
-/* ---------------------------------------------------------------------------------------------------- */
-
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
static gboolean
-write_message_in_idle_cb (gpointer user_data)
+on_socket_ready (GSocket *socket,
+ GIOCondition condition,
+ gpointer user_data)
{
- GDBusWorker *worker = user_data;
- gboolean more_writes_are_pending;
- MessageToWriteData *data;
- GError *error;
+ MessageToWriteData *data = user_data;
+ write_message_continue_writing (data);
+ return FALSE; /* remove source */
+}
- g_mutex_lock (worker->write_lock);
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
+static void
+write_message_continue_writing (MessageToWriteData *data)
+{
+ GOutputStream *ostream;
+ GSimpleAsyncResult *simple;
+#ifdef G_OS_UNIX
+ GUnixFDList *fd_list;
+#endif
- data = g_queue_pop_head (worker->write_queue);
- g_assert (data != NULL);
+ /* Note: we can't access data->simple after calling g_async_result_complete () because the
+ * callback can free @data and we're not completing in idle. So use a copy of the pointer.
+ */
+ simple = data->simple;
- error = NULL;
- if (!write_message (worker,
- data,
- &error))
+ ostream = g_io_stream_get_output_stream (data->worker->stream);
+#ifdef G_OS_UNIX
+ fd_list = g_dbus_message_get_unix_fd_list (data->message);
+#endif
+
+ g_assert (!g_output_stream_has_pending (ostream));
+ g_assert_cmpint (data->total_written, <, data->blob_size);
+
+ if (FALSE)
{
- /* TODO: handle */
- _g_dbus_worker_emit_disconnected (worker, TRUE, error);
- g_error_free (error);
}
- message_to_write_data_free (data);
-
- more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
+#ifdef G_OS_UNIX
+ else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
+ {
+ GOutputVector vector;
+ GSocketControlMessage *control_message;
+ gssize bytes_written;
+ GError *error;
- worker->write_is_pending = more_writes_are_pending;
- g_mutex_unlock (worker->write_lock);
+ vector.buffer = data->blob;
+ vector.size = data->blob_size;
- return more_writes_are_pending;
-}
+ control_message = NULL;
+ if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
+ {
+ if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
+ {
+ g_simple_async_result_set_error (simple,
+ G_IO_ERROR,
+ G_IO_ERROR_FAILED,
+ "Tried sending a file descriptor but remote peer does not support this capability");
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
+ }
+ control_message = g_unix_fd_message_new_with_fd_list (fd_list);
+ }
-/* ---------------------------------------------------------------------------------------------------- */
+ error = NULL;
+ bytes_written = g_socket_send_message (data->worker->socket,
+ NULL, /* address */
+ &vector,
+ 1,
+ control_message != NULL ? &control_message : NULL,
+ control_message != NULL ? 1 : 0,
+ G_SOCKET_MSG_NONE,
+ data->worker->cancellable,
+ &error);
+ if (control_message != NULL)
+ g_object_unref (control_message);
-/* can be called from any thread - steals blob */
-void
-_g_dbus_worker_send_message (GDBusWorker *worker,
- GDBusMessage *message,
- gchar *blob,
- gsize blob_len)
-{
- MessageToWriteData *data;
+ if (bytes_written == -1)
+ {
+ /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+ {
+ GSource *source;
+ source = g_socket_create_source (data->worker->socket,
+ G_IO_OUT | G_IO_HUP | G_IO_ERR,
+ data->worker->cancellable);
+ g_source_set_callback (source,
+ (GSourceFunc) on_socket_ready,
+ data,
+ NULL); /* GDestroyNotify */
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
+ g_error_free (error);
+ goto out;
+ }
+ g_simple_async_result_take_error (simple, error);
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
+ }
+ g_assert (bytes_written > 0); /* zero is never returned */
- g_return_if_fail (G_IS_DBUS_MESSAGE (message));
- g_return_if_fail (blob != NULL);
- g_return_if_fail (blob_len > 16);
+ write_message_print_transport_debug (bytes_written, data);
- data = g_new0 (MessageToWriteData, 1);
- data->message = g_object_ref (message);
- data->blob = blob; /* steal! */
- data->blob_size = blob_len;
+ data->total_written += bytes_written;
+ g_assert (data->total_written <= data->blob_size);
+ if (data->total_written == data->blob_size)
+ {
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
+ }
- g_mutex_lock (worker->write_lock);
- g_queue_push_tail (worker->write_queue, data);
- if (!worker->write_is_pending)
+ write_message_continue_writing (data);
+ }
+#endif
+ else
{
- GSource *idle_source;
-
- worker->write_is_pending = TRUE;
+#ifdef G_OS_UNIX
+ if (fd_list != NULL)
+ {
+ g_simple_async_result_set_error (simple,
+ G_IO_ERROR,
+ G_IO_ERROR_FAILED,
+ "Tried sending a file descriptor on unsupported stream of type %s",
+ g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
+ }
+#endif
+
+ g_output_stream_write_async (ostream,
+ (const gchar *) data->blob + data->total_written,
+ data->blob_size - data->total_written,
+ G_PRIORITY_DEFAULT,
+ data->worker->cancellable,
+ write_message_async_cb,
+ data);
+ }
+ out:
+ ;
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
+static void
+write_message_async (GDBusWorker *worker,
+ MessageToWriteData *data,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ data->simple = g_simple_async_result_new (NULL,
+ callback,
+ user_data,
+ write_message_async);
+ data->total_written = 0;
+ write_message_continue_writing (data);
+}
+
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static gboolean
+write_message_finish (GAsyncResult *res,
+ GError **error)
+{
+ g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
+ if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
+ return FALSE;
+ else
+ return TRUE;
+}
+/* ---------------------------------------------------------------------------------------------------- */
+
+static void maybe_write_next_message (GDBusWorker *worker);
+
+typedef struct
+{
+ GDBusWorker *worker;
+ GList *flushers;
+} FlushAsyncData;
+
+static void
+flush_data_list_complete (const GList *flushers,
+ const GError *error)
+{
+ const GList *l;
+
+ for (l = flushers; l != NULL; l = l->next)
+ {
+ FlushData *f = l->data;
+
+ f->error = error != NULL ? g_error_copy (error) : NULL;
+
+ g_mutex_lock (&f->mutex);
+ g_cond_signal (&f->cond);
+ g_mutex_unlock (&f->mutex);
+ }
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
+static void
+ostream_flush_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ FlushAsyncData *data = user_data;
+ GError *error;
+
+ error = NULL;
+ g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
+ res,
+ &error);
+
+ if (error == NULL)
+ {
+ if (G_UNLIKELY (_g_dbus_debug_transport ()))
+ {
+ _g_dbus_debug_print_lock ();
+ g_print ("========================================================================\n"
+ "GDBus-debug:Transport:\n"
+ " ---- FLUSHED stream of type %s\n",
+ g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
+ _g_dbus_debug_print_unlock ();
+ }
+ }
+
+ g_assert (data->flushers != NULL);
+ flush_data_list_complete (data->flushers, error);
+ g_list_free (data->flushers);
+
+ if (error != NULL)
+ g_error_free (error);
+ /* Make sure we tell folks that we don't have additional
+ flushes pending */
+ g_mutex_lock (&data->worker->write_lock);
+ g_assert (data->worker->output_pending);
+ data->worker->output_pending = FALSE;
+ g_mutex_unlock (&data->worker->write_lock);
+
+ /* OK, cool, finally kick off the next write */
+ maybe_write_next_message (data->worker);
+
+ _g_dbus_worker_unref (data->worker);
+ g_free (data);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is false on entry
+ */
+static void
+message_written (GDBusWorker *worker,
+ MessageToWriteData *message_data)
+{
+ GList *l;
+ GList *ll;
+ GList *flushers;
+
+ /* first log the fact that we wrote a message */
+ if (G_UNLIKELY (_g_dbus_debug_message ()))
+ {
+ gchar *s;
+ _g_dbus_debug_print_lock ();
+ g_print ("========================================================================\n"
+ "GDBus-debug:Message:\n"
+ " >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
+ message_data->blob_size);
+ s = g_dbus_message_print (message_data->message, 2);
+ g_print ("%s", s);
+ g_free (s);
+ if (G_UNLIKELY (_g_dbus_debug_payload ()))
+ {
+ s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
+ g_print ("%s\n", s);
+ g_free (s);
+ }
+ _g_dbus_debug_print_unlock ();
+ }
+
+ /* then first wake up pending flushes and, if needed, flush the stream */
+ flushers = NULL;
+ g_mutex_lock (&worker->write_lock);
+ worker->write_num_messages_written += 1;
+ for (l = worker->write_pending_flushes; l != NULL; l = ll)
+ {
+ FlushData *f = l->data;
+ ll = l->next;
+
+ if (f->number_to_wait_for == worker->write_num_messages_written)
+ {
+ flushers = g_list_append (flushers, f);
+ worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
+ }
+ }
+ if (flushers != NULL)
+ {
+ g_assert (!worker->output_pending);
+ worker->output_pending = TRUE;
+ }
+ g_mutex_unlock (&worker->write_lock);
+
+ if (flushers != NULL)
+ {
+ FlushAsyncData *data;
+ data = g_new0 (FlushAsyncData, 1);
+ data->worker = _g_dbus_worker_ref (worker);
+ data->flushers = flushers;
+ /* flush the stream before writing the next message */
+ g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
+ G_PRIORITY_DEFAULT,
+ worker->cancellable,
+ ostream_flush_cb,
+ data);
+ }
+ else
+ {
+ /* kick off the next write! */
+ maybe_write_next_message (worker);
+ }
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
+static void
+write_message_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ MessageToWriteData *data = user_data;
+ GError *error;
+
+ g_mutex_lock (&data->worker->write_lock);
+ g_assert (data->worker->output_pending);
+ data->worker->output_pending = FALSE;
+ g_mutex_unlock (&data->worker->write_lock);
+
+ error = NULL;
+ if (!write_message_finish (res, &error))
+ {
+ /* TODO: handle */
+ _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
+ g_error_free (error);
+ }
+
+ /* this function will also kick of the next write (it might need to
+ * flush so writing the next message might happen much later
+ * e.g. async)
+ */
+ message_written (data->worker, data);
+
+ message_to_write_data_free (data);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
+static void
+iostream_close_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GDBusWorker *worker = user_data;
+ GError *error = NULL;
+ GList *pending_close_attempts, *pending_flush_attempts;
+ GQueue *send_queue;
+
+ g_io_stream_close_finish (worker->stream, res, &error);
+
+ g_mutex_lock (&worker->write_lock);
+
+ pending_close_attempts = worker->pending_close_attempts;
+ worker->pending_close_attempts = NULL;
+
+ pending_flush_attempts = worker->write_pending_flushes;
+ worker->write_pending_flushes = NULL;
+
+ send_queue = worker->write_queue;
+ worker->write_queue = g_queue_new ();
+
+ g_assert (worker->output_pending);
+ worker->output_pending = FALSE;
+
+ g_mutex_unlock (&worker->write_lock);
+
+ while (pending_close_attempts != NULL)
+ {
+ CloseData *close_data = pending_close_attempts->data;
+
+ pending_close_attempts = g_list_delete_link (pending_close_attempts,
+ pending_close_attempts);
+
+ if (close_data->result != NULL)
+ {
+ if (error != NULL)
+ g_simple_async_result_set_from_error (close_data->result, error);
+
+ /* this must be in an idle because the result is likely to be
+ * intended for another thread
+ */
+ g_simple_async_result_complete_in_idle (close_data->result);
+ }
+
+ close_data_free (close_data);
+ }
+
+ g_clear_error (&error);
+
+ /* all messages queued for sending are discarded */
+ g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
+ g_queue_free (send_queue);
+
+ /* all queued flushes fail */
+ error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ flush_data_list_complete (pending_flush_attempts, error);
+ g_list_free (pending_flush_attempts);
+ g_clear_error (&error);
+
+ _g_dbus_worker_unref (worker);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending must be false on entry
+ */
+static void
+maybe_write_next_message (GDBusWorker *worker)
+{
+ MessageToWriteData *data;
+
+ write_next:
+ /* we mustn't try to write two things at once */
+ g_assert (!worker->output_pending);
+
+ g_mutex_lock (&worker->write_lock);
+
+ /* if we want to close the connection, that takes precedence */
+ if (worker->pending_close_attempts != NULL)
+ {
+ worker->output_pending = TRUE;
+
+ g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
+ NULL, iostream_close_cb,
+ _g_dbus_worker_ref (worker));
+ data = NULL;
+ }
+ else
+ {
+ data = g_queue_pop_head (worker->write_queue);
+
+ if (data != NULL)
+ worker->output_pending = TRUE;
+ }
+
+ g_mutex_unlock (&worker->write_lock);
+
+ /* Note that write_lock is only used for protecting the @write_queue
+ * and @output_pending fields of the GDBusWorker struct ... which we
+ * need to modify from arbitrary threads in _g_dbus_worker_send_message().
+ *
+ * Therefore, it's fine to drop it here when calling back into user
+ * code and then writing the message out onto the GIOStream since this
+ * function only runs on the worker thread.
+ */
+ if (data != NULL)
+ {
+ GDBusMessage *old_message;
+ guchar *new_blob;
+ gsize new_blob_size;
+ GError *error;
+
+ old_message = data->message;
+ data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
+ if (data->message == old_message)
+ {
+ /* filters had no effect - do nothing */
+ }
+ else if (data->message == NULL)
+ {
+ /* filters dropped message */
+ g_mutex_lock (&worker->write_lock);
+ worker->output_pending = FALSE;
+ g_mutex_unlock (&worker->write_lock);
+ message_to_write_data_free (data);
+ goto write_next;
+ }
+ else
+ {
+ /* filters altered the message -> reencode */
+ error = NULL;
+ new_blob = g_dbus_message_to_blob (data->message,
+ &new_blob_size,
+ worker->capabilities,
+ &error);
+ if (new_blob == NULL)
+ {
+ /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
+ * the old message instead
+ */
+ g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
+ g_dbus_message_get_serial (data->message),
+ error->message);
+ g_error_free (error);
+ }
+ else
+ {
+ g_free (data->blob);
+ data->blob = (gchar *) new_blob;
+ data->blob_size = new_blob_size;
+ }
+ }
+
+ write_message_async (worker,
+ data,
+ write_message_cb,
+ data);
+ }
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending may be true or false
+ */
+static gboolean
+write_message_in_idle_cb (gpointer user_data)
+{
+ GDBusWorker *worker = user_data;
+
+ /* Because this is the worker thread, we can read this struct member
+ * without holding the lock: no other thread ever modifies it.
+ */
+ if (!worker->output_pending)
+ maybe_write_next_message (worker);
+
+ return FALSE;
+}
+
+/*
+ * @write_data: (transfer full) (allow-none):
+ * @close_data: (transfer full) (allow-none):
+ *
+ * Can be called from any thread
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
+static void
+schedule_write_in_worker_thread (GDBusWorker *worker,
+ MessageToWriteData *write_data,
+ CloseData *close_data)
+{
+ g_mutex_lock (&worker->write_lock);
+
+ if (write_data != NULL)
+ g_queue_push_tail (worker->write_queue, write_data);
+
+ if (close_data != NULL)
+ worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
+ close_data);
+
+ if (!worker->output_pending)
+ {
+ GSource *idle_source;
idle_source = g_idle_source_new ();
g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
g_source_set_callback (idle_source,
write_message_in_idle_cb,
_g_dbus_worker_ref (worker),
(GDestroyNotify) _g_dbus_worker_unref);
- g_source_attach (idle_source, shared_thread_data->context);
+ g_source_attach (idle_source, worker->shared_thread_data->context);
g_source_unref (idle_source);
}
- g_mutex_unlock (worker->write_lock);
+
+ g_mutex_unlock (&worker->write_lock);
}
/* ---------------------------------------------------------------------------------------------------- */
-static void
-_g_dbus_worker_thread_begin_func (gpointer user_data)
+/* can be called from any thread - steals blob
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
+void
+_g_dbus_worker_send_message (GDBusWorker *worker,
+ GDBusMessage *message,
+ gchar *blob,
+ gsize blob_len)
{
- GDBusWorker *worker = user_data;
+ MessageToWriteData *data;
- worker->thread = g_thread_self ();
+ g_return_if_fail (G_IS_DBUS_MESSAGE (message));
+ g_return_if_fail (blob != NULL);
+ g_return_if_fail (blob_len > 16);
- /* begin reading */
- _g_dbus_worker_do_read (worker);
+ data = g_new0 (MessageToWriteData, 1);
+ data->worker = _g_dbus_worker_ref (worker);
+ data->message = g_object_ref (message);
+ data->blob = blob; /* steal! */
+ data->blob_size = blob_len;
+
+ schedule_write_in_worker_thread (worker, data, NULL);
}
+/* ---------------------------------------------------------------------------------------------------- */
+
GDBusWorker *
-_g_dbus_worker_new (GIOStream *stream,
- GDBusCapabilityFlags capabilities,
- GDBusWorkerMessageReceivedCallback message_received_callback,
- GDBusWorkerDisconnectedCallback disconnected_callback,
- gpointer user_data)
+_g_dbus_worker_new (GIOStream *stream,
+ GDBusCapabilityFlags capabilities,
+ gboolean initially_frozen,
+ GDBusWorkerMessageReceivedCallback message_received_callback,
+ GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
+ GDBusWorkerDisconnectedCallback disconnected_callback,
+ gpointer user_data)
{
GDBusWorker *worker;
+ GSource *idle_source;
g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
g_return_val_if_fail (message_received_callback != NULL, NULL);
+ g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
g_return_val_if_fail (disconnected_callback != NULL, NULL);
worker = g_new0 (GDBusWorker, 1);
worker->ref_count = 1;
- worker->read_lock = g_mutex_new ();
+ g_mutex_init (&worker->read_lock);
worker->message_received_callback = message_received_callback;
+ worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
worker->disconnected_callback = disconnected_callback;
worker->user_data = user_data;
worker->stream = g_object_ref (stream);
worker->capabilities = capabilities;
worker->cancellable = g_cancellable_new ();
+ worker->output_pending = FALSE;
+
+ worker->frozen = initially_frozen;
+ worker->received_messages_while_frozen = g_queue_new ();
- worker->write_lock = g_mutex_new ();
+ g_mutex_init (&worker->write_lock);
worker->write_queue = g_queue_new ();
if (G_IS_SOCKET_CONNECTION (worker->stream))
worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
- _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker);
+ worker->shared_thread_data = _g_dbus_shared_thread_ref ();
+
+ /* begin reading */
+ idle_source = g_idle_source_new ();
+ g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (idle_source,
+ _g_dbus_worker_do_initial_read,
+ _g_dbus_worker_ref (worker),
+ (GDestroyNotify) _g_dbus_worker_unref);
+ g_source_attach (idle_source, worker->shared_thread_data->context);
+ g_source_unref (idle_source);
return worker;
}
-/* This can be called from any thread - frees worker - guarantees no callbacks
- * will ever be issued again
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* can be called from any thread
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
+void
+_g_dbus_worker_close (GDBusWorker *worker,
+ GCancellable *cancellable,
+ GSimpleAsyncResult *result)
+{
+ CloseData *close_data;
+
+ close_data = g_slice_new0 (CloseData);
+ close_data->worker = _g_dbus_worker_ref (worker);
+ close_data->cancellable =
+ (cancellable == NULL ? NULL : g_object_ref (cancellable));
+ close_data->result = (result == NULL ? NULL : g_object_ref (result));
+
+ g_cancellable_cancel (worker->cancellable);
+ schedule_write_in_worker_thread (worker, NULL, close_data);
+}
+
+/* This can be called from any thread - frees worker. Note that
+ * callbacks might still happen if called from another thread than the
+ * worker - use your own synchronization primitive in the callbacks.
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
*/
void
_g_dbus_worker_stop (GDBusWorker *worker)
{
- /* If we're called in the worker thread it means we are called from
- * a worker callback. This is fine, we just can't lock in that case since
- * we're already holding the lock...
+ g_atomic_int_set (&worker->stopped, TRUE);
+
+ /* Cancel any pending operations and schedule a close of the underlying I/O
+ * stream in the worker thread
*/
- if (g_thread_self () != worker->thread)
- g_mutex_lock (worker->read_lock);
- worker->stopped = TRUE;
- if (g_thread_self () != worker->thread)
- g_mutex_unlock (worker->read_lock);
+ _g_dbus_worker_close (worker, NULL, NULL);
- g_cancellable_cancel (worker->cancellable);
+ /* _g_dbus_worker_close holds a ref until after an idle in the the worker
+ * thread has run, so we no longer need to unref in an idle like in
+ * commit 322e25b535
+ */
_g_dbus_worker_unref (worker);
}
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* can be called from any thread (except the worker thread) - blocks
+ * calling thread until all queued outgoing messages are written and
+ * the transport has been flushed
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
+gboolean
+_g_dbus_worker_flush_sync (GDBusWorker *worker,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gboolean ret;
+ FlushData *data;
+
+ data = NULL;
+ ret = TRUE;
+
+ /* if the queue is empty, there's nothing to wait for */
+ g_mutex_lock (&worker->write_lock);
+ if (g_queue_get_length (worker->write_queue) > 0)
+ {
+ data = g_new0 (FlushData, 1);
+ g_mutex_init (&data->mutex);
+ g_cond_init (&data->cond);
+ data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
+ g_mutex_lock (&data->mutex);
+ worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
+ }
+ g_mutex_unlock (&worker->write_lock);
+
+ if (data != NULL)
+ {
+ g_cond_wait (&data->cond, &data->mutex);
+ g_mutex_unlock (&data->mutex);
+
+ /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
+ g_cond_clear (&data->cond);
+ g_mutex_clear (&data->mutex);
+ if (data->error != NULL)
+ {
+ ret = FALSE;
+ g_propagate_error (error, data->error);
+ }
+ g_free (data);
+ }
+
+ return ret;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
#define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
-#define G_DBUS_DEBUG_MESSAGE (1<<1)
-#define G_DBUS_DEBUG_ALL 0xffffffff
+#define G_DBUS_DEBUG_TRANSPORT (1<<1)
+#define G_DBUS_DEBUG_MESSAGE (1<<2)
+#define G_DBUS_DEBUG_PAYLOAD (1<<3)
+#define G_DBUS_DEBUG_CALL (1<<4)
+#define G_DBUS_DEBUG_SIGNAL (1<<5)
+#define G_DBUS_DEBUG_INCOMING (1<<6)
+#define G_DBUS_DEBUG_RETURN (1<<7)
+#define G_DBUS_DEBUG_EMISSION (1<<8)
+#define G_DBUS_DEBUG_ADDRESS (1<<9)
+
static gint _gdbus_debug_flags = 0;
gboolean
}
gboolean
+_g_dbus_debug_transport (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
+}
+
+gboolean
_g_dbus_debug_message (void)
{
_g_dbus_initialize ();
return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
}
+gboolean
+_g_dbus_debug_payload (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
+}
+
+gboolean
+_g_dbus_debug_call (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
+}
+
+gboolean
+_g_dbus_debug_signal (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
+}
+
+gboolean
+_g_dbus_debug_incoming (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
+}
+
+gboolean
+_g_dbus_debug_return (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
+}
+
+gboolean
+_g_dbus_debug_emission (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
+}
+
+gboolean
+_g_dbus_debug_address (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
+}
+
+G_LOCK_DEFINE_STATIC (print_lock);
+
+void
+_g_dbus_debug_print_lock (void)
+{
+ G_LOCK (print_lock);
+}
+
+void
+_g_dbus_debug_print_unlock (void)
+{
+ G_UNLOCK (print_lock);
+}
+
/*
* _g_dbus_initialize:
*
const gchar *debug;
g_dbus_error_domain = G_DBUS_ERROR;
+ (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
debug = g_getenv ("G_DBUS_DEBUG");
if (debug != NULL)
{
- gchar **tokens;
- guint n;
- tokens = g_strsplit (debug, ",", 0);
- for (n = 0; tokens[n] != NULL; n++)
- {
- if (g_strcmp0 (tokens[n], "authentication") == 0)
- _gdbus_debug_flags |= G_DBUS_DEBUG_AUTHENTICATION;
- else if (g_strcmp0 (tokens[n], "message") == 0)
- _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
- else if (g_strcmp0 (tokens[n], "all") == 0)
- _gdbus_debug_flags |= G_DBUS_DEBUG_ALL;
- }
- g_strfreev (tokens);
+ const GDebugKey keys[] = {
+ { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
+ { "transport", G_DBUS_DEBUG_TRANSPORT },
+ { "message", G_DBUS_DEBUG_MESSAGE },
+ { "payload", G_DBUS_DEBUG_PAYLOAD },
+ { "call", G_DBUS_DEBUG_CALL },
+ { "signal", G_DBUS_DEBUG_SIGNAL },
+ { "incoming", G_DBUS_DEBUG_INCOMING },
+ { "return", G_DBUS_DEBUG_RETURN },
+ { "emission", G_DBUS_DEBUG_EMISSION },
+ { "address", G_DBUS_DEBUG_ADDRESS }
+ };
+
+ _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
+ if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
+ _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
}
g_once_init_leave (&initialized, 1);
/* ---------------------------------------------------------------------------------------------------- */
-gchar *
-_g_dbus_compute_complete_signature (GDBusArgInfo **args,
- gboolean include_parentheses)
+GVariantType *
+_g_dbus_compute_complete_signature (GDBusArgInfo **args)
{
- GString *s;
+ const GVariantType *arg_types[256];
guint n;
- if (include_parentheses)
- s = g_string_new ("(");
- else
- s = g_string_new ("");
- if (args != NULL)
+ if (args)
for (n = 0; args[n] != NULL; n++)
- g_string_append (s, args[n]->signature);
+ {
+ /* DBus places a hard limit of 255 on signature length.
+ * therefore number of args must be less than 256.
+ */
+ g_assert (n < 256);
+
+ arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
+
+ if G_UNLIKELY (arg_types[n] == NULL)
+ return NULL;
+ }
+ else
+ n = 0;
+
+ return g_variant_type_new_tuple (arg_types, n);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+#ifdef G_OS_WIN32
+
+extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
+
+gchar *
+_g_dbus_win32_get_user_sid (void)
+{
+ HANDLE h;
+ TOKEN_USER *user;
+ DWORD token_information_len;
+ PSID psid;
+ gchar *sid;
+ gchar *ret;
+
+ ret = NULL;
+ user = NULL;
+ h = INVALID_HANDLE_VALUE;
+
+ if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
+ {
+ g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
+ goto out;
+ }
+
+ /* Get length of buffer */
+ token_information_len = 0;
+ if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
+ {
+ if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
+ {
+ g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
+ goto out;
+ }
+ }
+ user = g_malloc (token_information_len);
+ if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
+ {
+ g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
+ goto out;
+ }
+
+ psid = user->User.Sid;
+ if (!IsValidSid (psid))
+ {
+ g_warning ("Invalid SID");
+ goto out;
+ }
+
+ if (!ConvertSidToStringSidA (psid, &sid))
+ {
+ g_warning ("Invalid SID");
+ goto out;
+ }
+
+ ret = g_strdup (sid);
+ LocalFree (sid);
+
+out:
+ g_free (user);
+ if (h != INVALID_HANDLE_VALUE)
+ CloseHandle (h);
+ return ret;
+}
+#endif
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+gchar *
+_g_dbus_get_machine_id (GError **error)
+{
+ gchar *ret;
+ /* TODO: use PACKAGE_LOCALSTATEDIR ? */
+ ret = NULL;
+ if (!g_file_get_contents ("/var/lib/dbus/machine-id",
+ &ret,
+ NULL,
+ error))
+ {
+ g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
+ }
+ else
+ {
+ /* TODO: validate value */
+ g_strstrip (ret);
+ }
+ return ret;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+gchar *
+_g_dbus_enum_to_string (GType enum_type, gint value)
+{
+ gchar *ret;
+ GEnumClass *klass;
+ GEnumValue *enum_value;
+
+ klass = g_type_class_ref (enum_type);
+ enum_value = g_enum_get_value (klass, value);
+ if (enum_value != NULL)
+ ret = g_strdup (enum_value->value_nick);
+ else
+ ret = g_strdup_printf ("unknown (value %d)", value);
+ g_type_class_unref (klass);
+ return ret;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+static void
+write_message_print_transport_debug (gssize bytes_written,
+ MessageToWriteData *data)
+{
+ if (G_LIKELY (!_g_dbus_debug_transport ()))
+ goto out;
+
+ _g_dbus_debug_print_lock ();
+ g_print ("========================================================================\n"
+ "GDBus-debug:Transport:\n"
+ " >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
+ " size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
+ bytes_written,
+ g_dbus_message_get_serial (data->message),
+ data->blob_size,
+ data->total_written,
+ g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
+ _g_dbus_debug_print_unlock ();
+ out:
+ ;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+static void
+read_message_print_transport_debug (gssize bytes_read,
+ GDBusWorker *worker)
+{
+ gsize size;
+ gint32 serial;
+ gint32 message_length;
+
+ if (G_LIKELY (!_g_dbus_debug_transport ()))
+ goto out;
+
+ size = bytes_read + worker->read_buffer_cur_size;
+ serial = 0;
+ message_length = 0;
+ if (size >= 16)
+ message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
+ if (size >= 1)
+ {
+ switch (worker->read_buffer[0])
+ {
+ case 'l':
+ if (size >= 12)
+ serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
+ break;
+ case 'B':
+ if (size >= 12)
+ serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
+ break;
+ default:
+ /* an error will be set elsewhere if this happens */
+ goto out;
+ }
+ }
+
+ _g_dbus_debug_print_lock ();
+ g_print ("========================================================================\n"
+ "GDBus-debug:Transport:\n"
+ " <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
+ " size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
+ bytes_read,
+ serial,
+ message_length,
+ worker->read_buffer_cur_size,
+ g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
+ _g_dbus_debug_print_unlock ();
+ out:
+ ;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+gboolean
+_g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
+ GValue *return_accu,
+ const GValue *handler_return,
+ gpointer dummy)
+{
+ gboolean continue_emission;
+ gboolean signal_return;
- if (include_parentheses)
- g_string_append_c (s, ')');
+ signal_return = g_value_get_boolean (handler_return);
+ g_value_set_boolean (return_accu, signal_return);
+ continue_emission = signal_return;
- return g_string_free (s, FALSE);
+ return continue_emission;
}