+ error = NULL;
+ bytes_written = g_socket_send_message (data->worker->socket,
+ NULL, /* address */
+ &vector,
+ 1,
+ control_message != NULL ? &control_message : NULL,
+ control_message != NULL ? 1 : 0,
+ G_SOCKET_MSG_NONE,
+ data->worker->cancellable,
+ &error);
+ if (control_message != NULL)
+ g_object_unref (control_message);
+
+ if (bytes_written == -1)
+ {
+ /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+ {
+ GSource *source;
+ source = g_socket_create_source (data->worker->socket,
+ G_IO_OUT | G_IO_HUP | G_IO_ERR,
+ data->worker->cancellable);
+ g_source_set_callback (source,
+ (GSourceFunc) on_socket_ready,
+ data,
+ NULL); /* GDestroyNotify */
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
+ g_error_free (error);
+ goto out;
+ }
+ g_simple_async_result_take_error (simple, error);
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
+ }
+ g_assert (bytes_written > 0); /* zero is never returned */
+
+ write_message_print_transport_debug (bytes_written, data);
+
+ data->total_written += bytes_written;
+ g_assert (data->total_written <= data->blob_size);
+ if (data->total_written == data->blob_size)
+ {
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
+ }
+
+ write_message_continue_writing (data);
+ }
+#endif
+ else
+ {
+#ifdef G_OS_UNIX
+ if (fd_list != NULL)
+ {
+ g_simple_async_result_set_error (simple,
+ G_IO_ERROR,
+ G_IO_ERROR_FAILED,
+ "Tried sending a file descriptor on unsupported stream of type %s",
+ g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
+ }
+#endif
+
+ g_output_stream_write_async (ostream,
+ (const gchar *) data->blob + data->total_written,
+ data->blob_size - data->total_written,
+ G_PRIORITY_DEFAULT,
+ data->worker->cancellable,
+ write_message_async_cb,
+ data);
+ }
+#ifdef G_OS_UNIX
+ out:
+#endif
+ ;
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_WRITE on entry
+ */
+static void
+write_message_async (GDBusWorker *worker,
+ MessageToWriteData *data,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ data->simple = g_simple_async_result_new (NULL,
+ callback,
+ user_data,
+ write_message_async);
+ data->total_written = 0;
+ write_message_continue_writing (data);
+}
+
+/* called in private thread shared by all GDBusConnection instances (with write-lock held) */
+static gboolean
+write_message_finish (GAsyncResult *res,
+ GError **error)
+{
+ g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
+ if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
+ return FALSE;
+ else
+ return TRUE;
+}
+/* ---------------------------------------------------------------------------------------------------- */
+
+static void continue_writing (GDBusWorker *worker);
+
+typedef struct
+{
+ GDBusWorker *worker;
+ GList *flushers;
+} FlushAsyncData;
+
+static void
+flush_data_list_complete (const GList *flushers,
+ const GError *error)
+{
+ const GList *l;
+
+ for (l = flushers; l != NULL; l = l->next)
+ {
+ FlushData *f = l->data;
+
+ f->error = error != NULL ? g_error_copy (error) : NULL;
+
+ g_mutex_lock (&f->mutex);
+ g_cond_signal (&f->cond);
+ g_mutex_unlock (&f->mutex);
+ }
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_FLUSH on entry
+ */
+static void
+ostream_flush_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ FlushAsyncData *data = user_data;
+ GError *error;
+
+ error = NULL;
+ g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
+ res,
+ &error);
+
+ if (error == NULL)
+ {
+ if (G_UNLIKELY (_g_dbus_debug_transport ()))
+ {
+ _g_dbus_debug_print_lock ();
+ g_print ("========================================================================\n"
+ "GDBus-debug:Transport:\n"
+ " ---- FLUSHED stream of type %s\n",
+ g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
+ _g_dbus_debug_print_unlock ();
+ }
+ }
+
+ g_assert (data->flushers != NULL);
+ flush_data_list_complete (data->flushers, error);
+ g_list_free (data->flushers);
+
+ if (error != NULL)
+ g_error_free (error);
+
+ /* Make sure we tell folks that we don't have additional
+ flushes pending */
+ g_mutex_lock (&data->worker->write_lock);
+ data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
+ g_assert (data->worker->output_pending == PENDING_FLUSH);
+ data->worker->output_pending = PENDING_NONE;
+ g_mutex_unlock (&data->worker->write_lock);
+
+ /* OK, cool, finally kick off the next write */
+ continue_writing (data->worker);
+
+ _g_dbus_worker_unref (data->worker);
+ g_free (data);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_FLUSH on entry
+ */
+static void
+start_flush (FlushAsyncData *data)
+{
+ g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
+ G_PRIORITY_DEFAULT,
+ data->worker->cancellable,
+ ostream_flush_cb,
+ data);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ */
+static void
+message_written_unlocked (GDBusWorker *worker,
+ MessageToWriteData *message_data)
+{
+ if (G_UNLIKELY (_g_dbus_debug_message ()))
+ {
+ gchar *s;
+ _g_dbus_debug_print_lock ();
+ g_print ("========================================================================\n"
+ "GDBus-debug:Message:\n"
+ " >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
+ message_data->blob_size);
+ s = g_dbus_message_print (message_data->message, 2);
+ g_print ("%s", s);
+ g_free (s);
+ if (G_UNLIKELY (_g_dbus_debug_payload ()))
+ {
+ s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
+ g_print ("%s\n", s);
+ g_free (s);
+ }
+ _g_dbus_debug_print_unlock ();
+ }
+
+ worker->write_num_messages_written += 1;
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ *
+ * Returns: non-%NULL, setting @output_pending, if we need to flush now
+ */
+static FlushAsyncData *
+prepare_flush_unlocked (GDBusWorker *worker)
+{
+ GList *l;
+ GList *ll;
+ GList *flushers;
+
+ flushers = NULL;
+ for (l = worker->write_pending_flushes; l != NULL; l = ll)
+ {
+ FlushData *f = l->data;
+ ll = l->next;
+
+ if (f->number_to_wait_for == worker->write_num_messages_written)
+ {
+ flushers = g_list_append (flushers, f);
+ worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
+ }
+ }
+ if (flushers != NULL)
+ {
+ g_assert (worker->output_pending == PENDING_NONE);
+ worker->output_pending = PENDING_FLUSH;
+ }
+
+ if (flushers != NULL)
+ {
+ FlushAsyncData *data;
+
+ data = g_new0 (FlushAsyncData, 1);
+ data->worker = _g_dbus_worker_ref (worker);
+ data->flushers = flushers;
+ return data;
+ }
+
+ return NULL;
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_WRITE on entry
+ */
+static void
+write_message_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ MessageToWriteData *data = user_data;
+ GError *error;
+
+ g_mutex_lock (&data->worker->write_lock);
+ g_assert (data->worker->output_pending == PENDING_WRITE);
+ data->worker->output_pending = PENDING_NONE;
+
+ error = NULL;
+ if (!write_message_finish (res, &error))
+ {
+ g_mutex_unlock (&data->worker->write_lock);
+
+ /* TODO: handle */
+ _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
+ g_error_free (error);
+
+ g_mutex_lock (&data->worker->write_lock);
+ }
+
+ message_written_unlocked (data->worker, data);
+
+ g_mutex_unlock (&data->worker->write_lock);
+
+ continue_writing (data->worker);
+
+ message_to_write_data_free (data);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_CLOSE on entry
+ */
+static void
+iostream_close_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{