+ {
+ flush_async_data = prepare_flush_unlocked (worker);
+
+ if (flush_async_data == NULL)
+ {
+ data = g_queue_pop_head (worker->write_queue);
+
+ if (data != NULL)
+ worker->output_pending = PENDING_WRITE;
+ }
+ }
+
+ g_mutex_unlock (&worker->write_lock);
+
+ /* Note that write_lock is only used for protecting the @write_queue
+ * and @output_pending fields of the GDBusWorker struct ... which we
+ * need to modify from arbitrary threads in _g_dbus_worker_send_message().
+ *
+ * Therefore, it's fine to drop it here when calling back into user
+ * code and then writing the message out onto the GIOStream since this
+ * function only runs on the worker thread.
+ */
+
+ if (flush_async_data != NULL)
+ {
+ start_flush (flush_async_data);
+ g_assert (data == NULL);
+ }
+ else if (data != NULL)
+ {
+ GDBusMessage *old_message;
+ guchar *new_blob;
+ gsize new_blob_size;
+ GError *error;
+
+ old_message = data->message;
+ data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
+ if (data->message == old_message)
+ {
+ /* filters had no effect - do nothing */
+ }
+ else if (data->message == NULL)
+ {
+ /* filters dropped message */
+ g_mutex_lock (&worker->write_lock);
+ worker->output_pending = PENDING_NONE;
+ g_mutex_unlock (&worker->write_lock);
+ message_to_write_data_free (data);
+ goto write_next;
+ }
+ else
+ {
+ /* filters altered the message -> reencode */
+ error = NULL;
+
+ /* [KDBUS]
+ * Setting protocol version, before invoking g_dbus_message_to_blob() will
+ * be removed after preparing new function only for kdbus transport purposes
+ * (this function will be able to create blob directly/unconditionally in memfd
+ * object, without making copy):
+ *
+ * [1] https://code.google.com/p/d-bus/source/browse/TODO
+ */
+
+ if (G_IS_KDBUS_CONNECTION (worker->stream))
+ _g_dbus_message_set_protocol_ver (data->message,2);
+ else
+ _g_dbus_message_set_protocol_ver (data->message,1);
+
+ new_blob = g_dbus_message_to_blob (data->message,
+ &new_blob_size,
+ worker->capabilities,
+ &error);
+ if (new_blob == NULL)
+ {
+ /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
+ * the old message instead
+ */
+ g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
+ g_dbus_message_get_serial (data->message),
+ error->message);
+ g_error_free (error);
+ }
+ else
+ {
+ g_free (data->blob);
+ data->blob = (gchar *) new_blob;
+ data->blob_size = new_blob_size;
+ }
+ }
+
+ write_message_async (worker,
+ data,
+ write_message_cb,
+ data);
+ }
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending may be anything
+ */
+static gboolean
+continue_writing_in_idle_cb (gpointer user_data)
+{
+ GDBusWorker *worker = user_data;
+
+ /* Because this is the worker thread, we can read this struct member
+ * without holding the lock: no other thread ever modifies it.
+ */
+ if (worker->output_pending == PENDING_NONE)
+ continue_writing (worker);
+
+ return FALSE;
+}
+
+/**
+ * @write_data: (transfer full) (allow-none):
+ * @flush_data: (transfer full) (allow-none):
+ * @close_data: (transfer full) (allow-none):
+ *
+ * Can be called from any thread
+ *
+ * write_lock is held on entry
+ * output_pending may be anything
+ */
+static void
+schedule_writing_unlocked (GDBusWorker *worker,
+ MessageToWriteData *write_data,
+ FlushData *flush_data,
+ CloseData *close_data)
+{
+ if (write_data != NULL)
+ g_queue_push_tail (worker->write_queue, write_data);
+
+ if (flush_data != NULL)
+ worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
+
+ if (close_data != NULL)
+ worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
+ close_data);
+
+ /* If we had output pending, the next bit of output will happen
+ * automatically when it finishes, so we only need to do this
+ * if nothing was pending.
+ *
+ * The idle callback will re-check that output_pending is still
+ * PENDING_NONE, to guard against output starting before the idle.
+ */
+ if (worker->output_pending == PENDING_NONE)
+ {
+ GSource *idle_source;
+ idle_source = g_idle_source_new ();
+ g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (idle_source,
+ continue_writing_in_idle_cb,
+ _g_dbus_worker_ref (worker),
+ (GDestroyNotify) _g_dbus_worker_unref);
+ g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
+ g_source_attach (idle_source, worker->shared_thread_data->context);
+ g_source_unref (idle_source);
+ }
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* can be called from any thread - steals blob
+ *
+ * write_lock is not held on entry
+ * output_pending may be anything
+ */
+void
+_g_dbus_worker_send_message (GDBusWorker *worker,
+ GDBusMessage *message,
+ gchar *blob,
+ gsize blob_len)
+{
+ MessageToWriteData *data;
+
+ g_return_if_fail (G_IS_DBUS_MESSAGE (message));
+ g_return_if_fail (blob != NULL);
+ g_return_if_fail (blob_len > 16);
+
+ data = g_new0 (MessageToWriteData, 1);
+ data->worker = _g_dbus_worker_ref (worker);
+ data->message = g_object_ref (message);
+ data->blob = blob; /* steal! */
+ data->blob_size = blob_len;
+
+ g_mutex_lock (&worker->write_lock);
+ schedule_writing_unlocked (worker, data, NULL, NULL);
+ g_mutex_unlock (&worker->write_lock);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+GDBusWorker *
+_g_dbus_worker_new (GIOStream *stream,
+ GDBusCapabilityFlags capabilities,
+ gboolean initially_frozen,
+ GDBusWorkerMessageReceivedCallback message_received_callback,
+ GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
+ GDBusWorkerDisconnectedCallback disconnected_callback,
+ gpointer user_data)
+{
+ GDBusWorker *worker;
+ GSource *idle_source;
+
+ g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
+ g_return_val_if_fail (message_received_callback != NULL, NULL);
+ g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
+ g_return_val_if_fail (disconnected_callback != NULL, NULL);
+
+ worker = g_new0 (GDBusWorker, 1);
+ worker->ref_count = 1;
+
+ g_mutex_init (&worker->read_lock);
+ worker->message_received_callback = message_received_callback;
+ worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
+ worker->disconnected_callback = disconnected_callback;
+ worker->user_data = user_data;
+ worker->stream = g_object_ref (stream);
+ worker->capabilities = capabilities;
+ worker->cancellable = g_cancellable_new ();
+ worker->output_pending = PENDING_NONE;
+
+ worker->frozen = initially_frozen;
+ worker->received_messages_while_frozen = g_queue_new ();
+
+ g_mutex_init (&worker->write_lock);
+ worker->write_queue = g_queue_new ();
+
+ if (G_IS_SOCKET_CONNECTION (worker->stream))
+ worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
+
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+ if (G_IS_KDBUS_CONNECTION (worker->stream))
+ worker->kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream));
+#endif
+
+ worker->shared_thread_data = _g_dbus_shared_thread_ref ();
+
+ /* begin reading */
+ idle_source = g_idle_source_new ();
+ g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (idle_source,
+ _g_dbus_worker_do_initial_read,
+ _g_dbus_worker_ref (worker),
+ (GDestroyNotify) _g_dbus_worker_unref);
+ g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
+ g_source_attach (idle_source, worker->shared_thread_data->context);
+ g_source_unref (idle_source);
+
+ return worker;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* can be called from any thread
+ *
+ * write_lock is not held on entry
+ * output_pending may be anything
+ */
+void
+_g_dbus_worker_close (GDBusWorker *worker,
+ GCancellable *cancellable,
+ GSimpleAsyncResult *result)
+{
+ CloseData *close_data;
+
+ close_data = g_slice_new0 (CloseData);
+ close_data->worker = _g_dbus_worker_ref (worker);
+ close_data->cancellable =
+ (cancellable == NULL ? NULL : g_object_ref (cancellable));
+ close_data->result = (result == NULL ? NULL : g_object_ref (result));
+
+ /* Don't set worker->close_expected here - we're in the wrong thread.
+ * It'll be set before the actual close happens.
+ */
+ g_cancellable_cancel (worker->cancellable);
+ g_mutex_lock (&worker->write_lock);
+ schedule_writing_unlocked (worker, NULL, NULL, close_data);
+ g_mutex_unlock (&worker->write_lock);
+}
+
+/* This can be called from any thread - frees worker. Note that
+ * callbacks might still happen if called from another thread than the
+ * worker - use your own synchronization primitive in the callbacks.
+ *
+ * write_lock is not held on entry
+ * output_pending may be anything
+ */
+void
+_g_dbus_worker_stop (GDBusWorker *worker)
+{
+ g_atomic_int_set (&worker->stopped, TRUE);
+
+ /* Cancel any pending operations and schedule a close of the underlying I/O
+ * stream in the worker thread
+ */
+ _g_dbus_worker_close (worker, NULL, NULL);
+
+ /* _g_dbus_worker_close holds a ref until after an idle in the worker
+ * thread has run, so we no longer need to unref in an idle like in
+ * commit 322e25b535
+ */
+ _g_dbus_worker_unref (worker);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* can be called from any thread (except the worker thread) - blocks
+ * calling thread until all queued outgoing messages are written and
+ * the transport has been flushed
+ *
+ * write_lock is not held on entry
+ * output_pending may be anything
+ */
+gboolean
+_g_dbus_worker_flush_sync (GDBusWorker *worker,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gboolean ret;
+ FlushData *data;
+ guint64 pending_writes;
+
+ data = NULL;
+ ret = TRUE;
+
+ g_mutex_lock (&worker->write_lock);
+
+ /* if the queue is empty, no write is in-flight and we haven't written
+ * anything since the last flush, then there's nothing to wait for
+ */
+ pending_writes = g_queue_get_length (worker->write_queue);
+
+ /* if a write is in-flight, we shouldn't be satisfied until the first
+ * flush operation that follows it
+ */
+ if (worker->output_pending == PENDING_WRITE)
+ pending_writes += 1;
+
+ if (pending_writes > 0 ||
+ worker->write_num_messages_written != worker->write_num_messages_flushed)
+ {
+ data = g_new0 (FlushData, 1);
+ g_mutex_init (&data->mutex);
+ g_cond_init (&data->cond);
+ data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
+ g_mutex_lock (&data->mutex);
+
+ schedule_writing_unlocked (worker, NULL, data, NULL);
+ }
+ g_mutex_unlock (&worker->write_lock);
+
+ if (data != NULL)
+ {
+ g_cond_wait (&data->cond, &data->mutex);
+ g_mutex_unlock (&data->mutex);
+
+ /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
+ g_cond_clear (&data->cond);
+ g_mutex_clear (&data->mutex);
+ if (data->error != NULL)
+ {
+ ret = FALSE;
+ g_propagate_error (error, data->error);
+ }
+ g_free (data);
+ }
+
+ return ret;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+#define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
+#define G_DBUS_DEBUG_TRANSPORT (1<<1)
+#define G_DBUS_DEBUG_MESSAGE (1<<2)
+#define G_DBUS_DEBUG_PAYLOAD (1<<3)
+#define G_DBUS_DEBUG_CALL (1<<4)
+#define G_DBUS_DEBUG_SIGNAL (1<<5)
+#define G_DBUS_DEBUG_INCOMING (1<<6)
+#define G_DBUS_DEBUG_RETURN (1<<7)
+#define G_DBUS_DEBUG_EMISSION (1<<8)
+#define G_DBUS_DEBUG_ADDRESS (1<<9)
+
+static gint _gdbus_debug_flags = 0;
+
+gboolean
+_g_dbus_debug_authentication (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
+}
+
+gboolean
+_g_dbus_debug_transport (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
+}
+
+gboolean
+_g_dbus_debug_message (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
+}
+
+gboolean
+_g_dbus_debug_payload (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
+}
+
+gboolean
+_g_dbus_debug_call (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
+}
+
+gboolean
+_g_dbus_debug_signal (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
+}
+
+gboolean
+_g_dbus_debug_incoming (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
+}
+
+gboolean
+_g_dbus_debug_return (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
+}
+
+gboolean
+_g_dbus_debug_emission (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
+}
+
+gboolean
+_g_dbus_debug_address (void)
+{
+ _g_dbus_initialize ();
+ return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
+}
+
+G_LOCK_DEFINE_STATIC (print_lock);
+
+void
+_g_dbus_debug_print_lock (void)
+{
+ G_LOCK (print_lock);
+}
+
+void
+_g_dbus_debug_print_unlock (void)
+{
+ G_UNLOCK (print_lock);
+}
+
+/**
+ * _g_dbus_initialize:
+ *
+ * Does various one-time init things such as
+ *
+ * - registering the G_DBUS_ERROR error domain
+ * - parses the G_DBUS_DEBUG environment variable
+ */
+void
+_g_dbus_initialize (void)
+{
+ static volatile gsize initialized = 0;
+
+ if (g_once_init_enter (&initialized))
+ {
+ volatile GQuark g_dbus_error_domain;
+ const gchar *debug;
+
+ g_dbus_error_domain = G_DBUS_ERROR;
+ (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
+
+ debug = g_getenv ("G_DBUS_DEBUG");
+ if (debug != NULL)
+ {
+ const GDebugKey keys[] = {
+ { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
+ { "transport", G_DBUS_DEBUG_TRANSPORT },
+ { "message", G_DBUS_DEBUG_MESSAGE },
+ { "payload", G_DBUS_DEBUG_PAYLOAD },
+ { "call", G_DBUS_DEBUG_CALL },
+ { "signal", G_DBUS_DEBUG_SIGNAL },
+ { "incoming", G_DBUS_DEBUG_INCOMING },
+ { "return", G_DBUS_DEBUG_RETURN },
+ { "emission", G_DBUS_DEBUG_EMISSION },
+ { "address", G_DBUS_DEBUG_ADDRESS }
+ };
+
+ _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
+ if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
+ _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
+ }
+
+ g_once_init_leave (&initialized, 1);
+ }
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+GVariantType *
+_g_dbus_compute_complete_signature (GDBusArgInfo **args)
+{
+ const GVariantType *arg_types[256];
+ guint n;
+
+ if (args)