+ g_assert (data->flushers != NULL);
+ flush_data_list_complete (data->flushers, error);
+ g_list_free (data->flushers);
+
+ if (error != NULL)
+ g_error_free (error);
+
+ /* Make sure we tell folks that we don't have additional
+ flushes pending */
+ g_mutex_lock (data->worker->write_lock);
+ g_assert (data->worker->output_pending);
+ data->worker->output_pending = FALSE;
+ g_mutex_unlock (data->worker->write_lock);
+
+ /* OK, cool, finally kick off the next write */
+ maybe_write_next_message (data->worker);
+
+ _g_dbus_worker_unref (data->worker);
+ g_free (data);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is false on entry
+ */
+static void
+message_written (GDBusWorker *worker,
+ MessageToWriteData *message_data)
+{
+ GList *l;
+ GList *ll;
+ GList *flushers;
+
+ /* first log the fact that we wrote a message */
+ if (G_UNLIKELY (_g_dbus_debug_message ()))
+ {
+ gchar *s;
+ _g_dbus_debug_print_lock ();
+ g_print ("========================================================================\n"
+ "GDBus-debug:Message:\n"
+ " >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
+ message_data->blob_size);
+ s = g_dbus_message_print (message_data->message, 2);
+ g_print ("%s", s);
+ g_free (s);
+ if (G_UNLIKELY (_g_dbus_debug_payload ()))
+ {
+ s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
+ g_print ("%s\n", s);
+ g_free (s);
+ }
+ _g_dbus_debug_print_unlock ();
+ }
+
+ /* then first wake up pending flushes and, if needed, flush the stream */
+ flushers = NULL;
+ g_mutex_lock (worker->write_lock);
+ worker->write_num_messages_written += 1;
+ for (l = worker->write_pending_flushes; l != NULL; l = ll)
+ {
+ FlushData *f = l->data;
+ ll = l->next;
+
+ if (f->number_to_wait_for == worker->write_num_messages_written)
+ {
+ flushers = g_list_append (flushers, f);
+ worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
+ }
+ }
+ if (flushers != NULL)
+ {
+ g_assert (!worker->output_pending);
+ worker->output_pending = TRUE;
+ }
+ g_mutex_unlock (worker->write_lock);
+
+ if (flushers != NULL)
+ {
+ FlushAsyncData *data;
+ data = g_new0 (FlushAsyncData, 1);
+ data->worker = _g_dbus_worker_ref (worker);
+ data->flushers = flushers;
+ /* flush the stream before writing the next message */
+ g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
+ G_PRIORITY_DEFAULT,
+ worker->cancellable,
+ ostream_flush_cb,
+ data);
+ }
+ else
+ {
+ /* kick off the next write! */
+ maybe_write_next_message (worker);
+ }
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
+static void
+write_message_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ MessageToWriteData *data = user_data;
+ GError *error;
+
+ g_mutex_lock (data->worker->write_lock);
+ g_assert (data->worker->output_pending);
+ data->worker->output_pending = FALSE;
+ g_mutex_unlock (data->worker->write_lock);
+
+ error = NULL;
+ if (!write_message_finish (res, &error))
+ {
+ /* TODO: handle */
+ _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
+ g_error_free (error);
+ }
+
+ /* this function will also kick of the next write (it might need to
+ * flush so writing the next message might happen much later
+ * e.g. async)
+ */
+ message_written (data->worker, data);
+
+ message_to_write_data_free (data);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
+static void
+iostream_close_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GDBusWorker *worker = user_data;
+ GError *error = NULL;
+ GList *pending_close_attempts, *pending_flush_attempts;
+ GQueue *send_queue;
+
+ g_io_stream_close_finish (worker->stream, res, &error);
+
+ g_mutex_lock (worker->write_lock);
+
+ pending_close_attempts = worker->pending_close_attempts;
+ worker->pending_close_attempts = NULL;
+
+ pending_flush_attempts = worker->write_pending_flushes;
+ worker->write_pending_flushes = NULL;
+
+ send_queue = worker->write_queue;
+ worker->write_queue = g_queue_new ();
+
+ g_assert (worker->output_pending);
+ worker->output_pending = FALSE;
+
+ g_mutex_unlock (worker->write_lock);
+
+ while (pending_close_attempts != NULL)
+ {
+ CloseData *close_data = pending_close_attempts->data;
+
+ pending_close_attempts = g_list_delete_link (pending_close_attempts,
+ pending_close_attempts);
+
+ if (close_data->result != NULL)
+ {
+ if (error != NULL)
+ g_simple_async_result_set_from_error (close_data->result, error);
+
+ /* this must be in an idle because the result is likely to be
+ * intended for another thread
+ */
+ g_simple_async_result_complete_in_idle (close_data->result);
+ }
+
+ close_data_free (close_data);
+ }
+
+ g_clear_error (&error);
+
+ /* all messages queued for sending are discarded */
+ g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
+ g_queue_free (send_queue);
+
+ /* all queued flushes fail */
+ error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ flush_data_list_complete (pending_flush_attempts, error);
+ g_list_free (pending_flush_attempts);
+ g_clear_error (&error);
+
+ _g_dbus_worker_unref (worker);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending must be false on entry
+ */
+static void
+maybe_write_next_message (GDBusWorker *worker)
+{
+ MessageToWriteData *data;
+
+ write_next:
+ /* we mustn't try to write two things at once */
+ g_assert (!worker->output_pending);
+
+ g_mutex_lock (worker->write_lock);
+
+ /* if we want to close the connection, that takes precedence */
+ if (worker->pending_close_attempts != NULL)
+ {
+ worker->output_pending = TRUE;
+
+ g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
+ NULL, iostream_close_cb,
+ _g_dbus_worker_ref (worker));
+ data = NULL;
+ }
+ else
+ {
+ data = g_queue_pop_head (worker->write_queue);
+
+ if (data != NULL)
+ worker->output_pending = TRUE;
+ }
+
+ g_mutex_unlock (worker->write_lock);
+
+ /* Note that write_lock is only used for protecting the @write_queue
+ * and @output_pending fields of the GDBusWorker struct ... which we
+ * need to modify from arbitrary threads in _g_dbus_worker_send_message().
+ *
+ * Therefore, it's fine to drop it here when calling back into user
+ * code and then writing the message out onto the GIOStream since this
+ * function only runs on the worker thread.
+ */
+ if (data != NULL)
+ {
+ GDBusMessage *old_message;
+ guchar *new_blob;
+ gsize new_blob_size;
+ GError *error;
+
+ old_message = data->message;
+ data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
+ if (data->message == old_message)
+ {
+ /* filters had no effect - do nothing */
+ }
+ else if (data->message == NULL)
+ {
+ /* filters dropped message */
+ g_mutex_lock (worker->write_lock);
+ worker->output_pending = FALSE;
+ g_mutex_unlock (worker->write_lock);
+ message_to_write_data_free (data);
+ goto write_next;
+ }
+ else
+ {
+ /* filters altered the message -> reencode */
+ error = NULL;
+ new_blob = g_dbus_message_to_blob (data->message,
+ &new_blob_size,
+ worker->capabilities,
+ &error);
+ if (new_blob == NULL)
+ {
+ /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
+ * the old message instead
+ */
+ g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
+ g_dbus_message_get_serial (data->message),
+ error->message);
+ g_error_free (error);
+ }
+ else
+ {
+ g_free (data->blob);
+ data->blob = (gchar *) new_blob;
+ data->blob_size = new_blob_size;
+ }
+ }
+
+ write_message_async (worker,
+ data,
+ write_message_cb,
+ data);
+ }
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending may be true or false
+ */
+static gboolean
+write_message_in_idle_cb (gpointer user_data)
+{
+ GDBusWorker *worker = user_data;
+
+ /* Because this is the worker thread, we can read this struct member
+ * without holding the lock: no other thread ever modifies it.
+ */
+ if (!worker->output_pending)
+ maybe_write_next_message (worker);
+
+ return FALSE;
+}
+
+/*
+ * @write_data: (transfer full) (allow-none):
+ * @close_data: (transfer full) (allow-none):
+ *
+ * Can be called from any thread
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
+static void
+schedule_write_in_worker_thread (GDBusWorker *worker,
+ MessageToWriteData *write_data,
+ CloseData *close_data)
+{
+ g_mutex_lock (worker->write_lock);
+
+ if (write_data != NULL)
+ g_queue_push_tail (worker->write_queue, write_data);
+
+ if (close_data != NULL)
+ worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
+ close_data);
+
+ if (!worker->output_pending)
+ {
+ GSource *idle_source;
+ idle_source = g_idle_source_new ();
+ g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (idle_source,
+ write_message_in_idle_cb,
+ _g_dbus_worker_ref (worker),
+ (GDestroyNotify) _g_dbus_worker_unref);
+ g_source_attach (idle_source, worker->shared_thread_data->context);
+ g_source_unref (idle_source);
+ }
+
+ g_mutex_unlock (worker->write_lock);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* can be called from any thread - steals blob
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
+void
+_g_dbus_worker_send_message (GDBusWorker *worker,
+ GDBusMessage *message,
+ gchar *blob,
+ gsize blob_len)
+{
+ MessageToWriteData *data;
+
+ g_return_if_fail (G_IS_DBUS_MESSAGE (message));
+ g_return_if_fail (blob != NULL);
+ g_return_if_fail (blob_len > 16);
+
+ data = g_new0 (MessageToWriteData, 1);
+ data->worker = _g_dbus_worker_ref (worker);
+ data->message = g_object_ref (message);
+ data->blob = blob; /* steal! */
+ data->blob_size = blob_len;
+
+ schedule_write_in_worker_thread (worker, data, NULL);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+GDBusWorker *
+_g_dbus_worker_new (GIOStream *stream,
+ GDBusCapabilityFlags capabilities,