[kdbus] Enable 'initial_read()' - is no longer needed
[platform/upstream/glib.git] / gio / gdbusprivate.c
index 30705bf..91feb41 100644 (file)
@@ -13,9 +13,7 @@
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General
- * Public License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
  *
  * Author: David Zeuthen <davidz@redhat.com>
  */
@@ -24,9 +22,6 @@
 
 #include <stdlib.h>
 #include <string.h>
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
 
 #include "giotypes.h"
 #include "gsocket.h"
 #include "gasyncresult.h"
 #include "gsimpleasyncresult.h"
 #include "ginputstream.h"
+#include "gmemoryinputstream.h"
 #include "giostream.h"
+#include "glib/gstdio.h"
 #include "gsocketcontrolmessage.h"
 #include "gsocketconnection.h"
+#include "gsocketoutputstream.h"
 
 #ifdef G_OS_UNIX
+#include "gkdbus.h"
+#include "gkdbusconnection.h"
 #include "gunixfdmessage.h"
 #include "gunixconnection.h"
 #include "gunixcredentialsmessage.h"
@@ -52,7 +52,8 @@
 #endif
 
 #include "glibintl.h"
-#include "gioalias.h"
+
+static gboolean _g_dbus_worker_do_initial_read (gpointer data);
 
 /* ---------------------------------------------------------------------------------------------------- */
 
@@ -91,6 +92,107 @@ _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
 
 /* ---------------------------------------------------------------------------------------------------- */
 
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+typedef struct
+{
+  GKdbus *kdbus;
+  GCancellable *cancellable;
+
+  GSimpleAsyncResult *simple;
+
+  gboolean from_mainloop;
+} ReadKdbusData;
+
+static void
+read_kdbus_data_free (ReadKdbusData  *data)
+{
+  g_object_unref (data->kdbus);
+  if (data->cancellable != NULL)
+    g_object_unref (data->cancellable);
+  g_object_unref (data->simple);
+  g_free (data);
+}
+
+static gboolean
+_g_kdbus_read_ready (GKdbus        *kdbus,
+                     GIOCondition   condition,
+                     gpointer       user_data)
+{
+  ReadKdbusData *data = user_data;
+  GError *error = NULL;
+  gssize result;
+
+  result = _g_kdbus_receive (data->kdbus,
+                             data->cancellable,
+                             &error);
+
+  if (result >= 0)
+    {
+      g_simple_async_result_set_op_res_gssize (data->simple, result);
+    }
+  else
+    {
+      g_assert (error != NULL);
+      g_simple_async_result_take_error (data->simple, error);
+    }
+
+  if (data->from_mainloop)
+    g_simple_async_result_complete (data->simple);
+  else
+    g_simple_async_result_complete_in_idle (data->simple);
+
+  return FALSE;
+}
+
+static void
+_g_kdbus_read (GKdbus               *kdbus,
+               GCancellable         *cancellable,
+               GAsyncReadyCallback   callback,
+               gpointer              user_data)
+{
+  ReadKdbusData *data;
+  GSource *source;
+
+  data = g_new0 (ReadKdbusData, 1);
+  data->kdbus = g_object_ref (kdbus);
+  data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
+
+  data->simple = g_simple_async_result_new (G_OBJECT (kdbus),
+                                            callback,
+                                            user_data,
+                                            _g_kdbus_read);
+  g_simple_async_result_set_check_cancellable (data->simple, cancellable);
+
+  data->from_mainloop = TRUE;
+  source = _g_kdbus_create_source (data->kdbus,
+                                   G_IO_IN,
+                                   cancellable);
+  g_source_set_callback (source,
+                         (GSourceFunc) _g_kdbus_read_ready,
+                         data,
+                         (GDestroyNotify) read_kdbus_data_free);
+  g_source_attach (source, g_main_context_get_thread_default ());
+  g_source_unref (source);
+}
+
+static gssize
+_g_kdbus_read_finish (GKdbus        *kdbus,
+                      GAsyncResult  *result,
+                      GError       **error)
+{
+  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+
+  g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
+  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read);
+
+  if (g_simple_async_result_propagate_error (simple, error))
+    return -1;
+  else
+    return g_simple_async_result_get_op_res_gssize (simple);
+}
+
+#endif /* defined (G_OS_UNIX) && (KDBUS_TRANSPORT) */
+
 /* Unfortunately ancillary messages are discarded when reading from a
  * socket using the GSocketInputStream abstraction. So we provide a
  * very GInputStream-ish API that uses GSocket in this case (very
@@ -152,8 +254,7 @@ _g_socket_read_with_control_messages_ready (GSocket      *socket,
   else
     {
       g_assert (error != NULL);
-      g_simple_async_result_set_from_error (data->simple, error);
-      g_error_free (error);
+      g_simple_async_result_take_error (data->simple, error);
     }
 
   if (data->from_mainloop)
@@ -189,6 +290,7 @@ _g_socket_read_with_control_messages (GSocket                 *socket,
                                             callback,
                                             user_data,
                                             _g_socket_read_with_control_messages);
+  g_simple_async_result_set_check_cancellable (data->simple, cancellable);
 
   if (!g_socket_condition_check (socket, G_IO_IN))
     {
@@ -229,153 +331,145 @@ _g_socket_read_with_control_messages_finish (GSocket       *socket,
 
 /* ---------------------------------------------------------------------------------------------------- */
 
-G_LOCK_DEFINE_STATIC (shared_thread_lock);
+/* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
+
+static GPtrArray *ensured_classes = NULL;
+
+static void
+ensure_type (GType gtype)
+{
+  g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
+}
+
+static void
+release_required_types (void)
+{
+  g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
+  g_ptr_array_unref (ensured_classes);
+  ensured_classes = NULL;
+}
+
+static void
+ensure_required_types (void)
+{
+  g_assert (ensured_classes == NULL);
+  ensured_classes = g_ptr_array_new ();
+  ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
+  ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
+}
+/* ---------------------------------------------------------------------------------------------------- */
 
 typedef struct
 {
-  gint num_users;
+  volatile gint refcount;
   GThread *thread;
   GMainContext *context;
   GMainLoop *loop;
 } SharedThreadData;
 
-static SharedThreadData *shared_thread_data = NULL;
-
 static gpointer
-shared_thread_func (gpointer data)
+gdbus_shared_thread_func (gpointer user_data)
 {
-  g_main_context_push_thread_default (shared_thread_data->context);
-  g_main_loop_run (shared_thread_data->loop);
-  g_main_context_pop_thread_default (shared_thread_data->context);
-  return NULL;
-}
+  SharedThreadData *data = user_data;
 
-typedef void (*GDBusSharedThreadFunc) (gpointer user_data);
+  g_main_context_push_thread_default (data->context);
+  g_main_loop_run (data->loop);
+  g_main_context_pop_thread_default (data->context);
 
-typedef struct
-{
-  GDBusSharedThreadFunc func;
-  gpointer              user_data;
-  gboolean              done;
-} CallerData;
+  release_required_types ();
 
-static gboolean
-invoke_caller (gpointer user_data)
-{
-  CallerData *data = user_data;
-  data->func (data->user_data);
-  data->done = TRUE;
-  return FALSE;
+  return NULL;
 }
 
-static void
-_g_dbus_shared_thread_ref (GDBusSharedThreadFunc func,
-                           gpointer              user_data)
-{
-  GError *error;
-  GSource *idle_source;
-  CallerData *data;
+/* ---------------------------------------------------------------------------------------------------- */
 
-  G_LOCK (shared_thread_lock);
+static SharedThreadData *
+_g_dbus_shared_thread_ref (void)
+{
+  static gsize shared_thread_data = 0;
+  SharedThreadData *ret;
 
-  if (shared_thread_data != NULL)
+  if (g_once_init_enter (&shared_thread_data))
     {
-      shared_thread_data->num_users += 1;
-      goto have_thread;
+      SharedThreadData *data;
+
+      /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
+      ensure_required_types ();
+
+      data = g_new0 (SharedThreadData, 1);
+      data->refcount = 0;
+      
+      data->context = g_main_context_new ();
+      data->loop = g_main_loop_new (data->context, FALSE);
+      data->thread = g_thread_new ("gdbus",
+                                   gdbus_shared_thread_func,
+                                   data);
+      /* We can cast between gsize and gpointer safely */
+      g_once_init_leave (&shared_thread_data, (gsize) data);
     }
 
-  shared_thread_data = g_new0 (SharedThreadData, 1);
-  shared_thread_data->num_users = 1;
-
-  error = NULL;
-  shared_thread_data->context = g_main_context_new ();
-  shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE);
-  shared_thread_data->thread = g_thread_create (shared_thread_func,
-                                                NULL,
-                                                TRUE,
-                                                &error);
-  g_assert_no_error (error);
-
- have_thread:
-
-  data = g_new0 (CallerData, 1);
-  data->func = func;
-  data->user_data = user_data;
-  data->done = FALSE;
-
-  idle_source = g_idle_source_new ();
-  g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
-  g_source_set_callback (idle_source,
-                         invoke_caller,
-                         data,
-                         NULL);
-  g_source_attach (idle_source, shared_thread_data->context);
-  g_source_unref (idle_source);
-
-  /* wait for the user code to run.. hmm.. probably use a condition variable instead */
-  while (!data->done)
-    g_thread_yield ();
-
-  g_free (data);
-
-  G_UNLOCK (shared_thread_lock);
+  ret = (SharedThreadData*) shared_thread_data;
+  g_atomic_int_inc (&ret->refcount);
+  return ret;
 }
 
 static void
-_g_dbus_shared_thread_unref (void)
+_g_dbus_shared_thread_unref (SharedThreadData *data)
 {
   /* TODO: actually destroy the shared thread here */
 #if 0
-  G_LOCK (shared_thread_lock);
-  g_assert (shared_thread_data != NULL);
-  shared_thread_data->num_users -= 1;
-  if (shared_thread_data->num_users == 0)
-    {
-      g_main_loop_quit (shared_thread_data->loop);
-      //g_thread_join (shared_thread_data->thread);
-      g_main_loop_unref (shared_thread_data->loop);
-      g_main_context_unref (shared_thread_data->context);
-      g_free (shared_thread_data);
-      shared_thread_data = NULL;
-      G_UNLOCK (shared_thread_lock);
-    }
-  else
+  g_assert (data != NULL);
+  if (g_atomic_int_dec_and_test (&data->refcount))
     {
-      G_UNLOCK (shared_thread_lock);
+      g_main_loop_quit (data->loop);
+      //g_thread_join (data->thread);
+      g_main_loop_unref (data->loop);
+      g_main_context_unref (data->context);
     }
 #endif
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
 
+typedef enum {
+    PENDING_NONE = 0,
+    PENDING_WRITE,
+    PENDING_FLUSH,
+    PENDING_CLOSE
+} OutputPending;
+
 struct GDBusWorker
 {
   volatile gint                       ref_count;
 
-  gboolean                            stopped;
+  SharedThreadData                   *shared_thread_data;
+
+  /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
+  volatile gint                       stopped;
 
   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
    * only affects messages received from the other peer (since GDBusServer is the
    * only user) - we might want it to affect messages sent to the other peer too?
    */
   gboolean                            frozen;
+  GDBusCapabilityFlags                capabilities;
   GQueue                             *received_messages_while_frozen;
 
   GIOStream                          *stream;
-  GDBusCapabilityFlags                capabilities;
   GCancellable                       *cancellable;
   GDBusWorkerMessageReceivedCallback  message_received_callback;
   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
   GDBusWorkerDisconnectedCallback     disconnected_callback;
   gpointer                            user_data;
 
-  GThread                            *thread;
-
-  /* if not NULL, stream is GSocketConnection */
+  /* if GSocket and GKdbus are NULL, stream is GSocketConnection */
   GSocket *socket;
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  GKdbus  *kdbus;
+#endif
 
   /* used for reading */
-  GMutex                             *read_lock;
+  GMutex                              read_lock;
   gchar                              *read_buffer;
   gsize                               read_buffer_allocated_size;
   gsize                               read_buffer_cur_size;
@@ -384,17 +478,73 @@ struct GDBusWorker
   GSocketControlMessage             **read_ancillary_messages;
   gint                                read_num_ancillary_messages;
 
+  /* Whether an async write, flush or close, or none of those, is pending.
+   * Only the worker thread may change its value, and only with the write_lock.
+   * Other threads may read its value when holding the write_lock.
+   * The worker thread may read its value at any time.
+   */
+  OutputPending                       output_pending;
   /* used for writing */
-  GMutex                             *write_lock;
+  GMutex                              write_lock;
+  /* queue of MessageToWriteData, protected by write_lock */
   GQueue                             *write_queue;
-  gboolean                            write_is_pending;
+  /* protected by write_lock */
+  guint64                             write_num_messages_written;
+  /* number of messages we'd written out last time we flushed;
+   * protected by write_lock
+   */
+  guint64                             write_num_messages_flushed;
+  /* list of FlushData, protected by write_lock */
+  GList                              *write_pending_flushes;
+  /* list of CloseData, protected by write_lock */
+  GList                              *pending_close_attempts;
+  /* no lock - only used from the worker thread */
+  gboolean                            close_expected;
 };
 
+static void _g_dbus_worker_unref (GDBusWorker *worker);
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+typedef struct
+{
+  GMutex  mutex;
+  GCond   cond;
+  guint64 number_to_wait_for;
+  GError *error;
+} FlushData;
+
 struct _MessageToWriteData ;
 typedef struct _MessageToWriteData MessageToWriteData;
 
 static void message_to_write_data_free (MessageToWriteData *data);
 
+static void read_message_print_transport_debug (gssize bytes_read,
+                                                GDBusWorker *worker);
+
+static void write_message_print_transport_debug (gssize bytes_written,
+                                                 MessageToWriteData *data);
+
+typedef struct {
+    GDBusWorker *worker;
+    GCancellable *cancellable;
+    GSimpleAsyncResult *result;
+} CloseData;
+
+static void close_data_free (CloseData *close_data)
+{
+  if (close_data->cancellable != NULL)
+    g_object_unref (close_data->cancellable);
+
+  if (close_data->result != NULL)
+    g_object_unref (close_data->result);
+
+  _g_dbus_worker_unref (close_data->worker);
+  g_slice_free (CloseData, close_data);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
 static GDBusWorker *
 _g_dbus_worker_ref (GDBusWorker *worker)
 {
@@ -407,21 +557,21 @@ _g_dbus_worker_unref (GDBusWorker *worker)
 {
   if (g_atomic_int_dec_and_test (&worker->ref_count))
     {
-      _g_dbus_shared_thread_unref ();
+      g_assert (worker->write_pending_flushes == NULL);
+
+      _g_dbus_shared_thread_unref (worker->shared_thread_data);
 
       g_object_unref (worker->stream);
 
-      g_mutex_free (worker->read_lock);
+      g_mutex_clear (&worker->read_lock);
       g_object_unref (worker->cancellable);
       if (worker->read_fd_list != NULL)
         g_object_unref (worker->read_fd_list);
 
-      g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
-      g_queue_free (worker->received_messages_while_frozen);
-
-      g_mutex_free (worker->write_lock);
-      g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
-      g_queue_free (worker->write_queue);
+      g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
+      g_mutex_clear (&worker->write_lock);
+      g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
+      g_free (worker->read_buffer);
 
       g_free (worker);
     }
@@ -432,7 +582,7 @@ _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
                                   gboolean      remote_peer_vanished,
                                   GError       *error)
 {
-  if (!worker->stopped)
+  if (!g_atomic_int_get (&worker->stopped))
     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
 }
 
@@ -440,18 +590,19 @@ static void
 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
                                       GDBusMessage *message)
 {
-  if (!worker->stopped)
+  if (!g_atomic_int_get (&worker->stopped))
     worker->message_received_callback (worker, message, worker->user_data);
 }
 
-static gboolean
+static GDBusMessage *
 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
                                               GDBusMessage *message)
 {
-  gboolean ret;
-  ret = FALSE;
-  if (!worker->stopped)
+  GDBusMessage *ret;
+  if (!g_atomic_int_get (&worker->stopped))
     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
+  else
+    ret = message;
   return ret;
 }
 
@@ -480,7 +631,7 @@ unfreeze_in_idle_cb (gpointer user_data)
   GDBusWorker *worker = user_data;
   GDBusMessage *message;
 
-  g_mutex_lock (worker->read_lock);
+  g_mutex_lock (&worker->read_lock);
   if (worker->frozen)
     {
       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
@@ -494,7 +645,7 @@ unfreeze_in_idle_cb (gpointer user_data)
     {
       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
     }
-  g_mutex_unlock (worker->read_lock);
+  g_mutex_unlock (&worker->read_lock);
   return FALSE;
 }
 
@@ -509,7 +660,8 @@ _g_dbus_worker_unfreeze (GDBusWorker *worker)
                          unfreeze_in_idle_cb,
                          _g_dbus_worker_ref (worker),
                          (GDestroyNotify) _g_dbus_worker_unref);
-  g_source_attach (idle_source, shared_thread_data->context);
+  g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb");
+  g_source_attach (idle_source, worker->shared_thread_data->context);
   g_source_unref (idle_source);
 }
 
@@ -527,14 +679,28 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
   GError *error;
   gssize bytes_read;
 
-  g_mutex_lock (worker->read_lock);
+  g_mutex_lock (&worker->read_lock);
 
   /* If already stopped, don't even process the reply */
-  if (worker->stopped)
+  if (g_atomic_int_get (&worker->stopped))
     goto out;
 
   error = NULL;
-  if (worker->socket == NULL)
+  bytes_read = 0;
+
+  if (FALSE)
+    {
+    }
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  else if (G_IS_KDBUS_CONNECTION (worker->stream))
+    {
+      bytes_read = _g_kdbus_read_finish (worker->kdbus,
+                                         res,
+                                         &error);
+      g_error ("[KDBUS] _g_dbus_worker_do_read_cb() - work in progress");
+    }
+#endif
+  else if (worker->socket == NULL)
     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
                                              res,
                                              &error);
@@ -572,7 +738,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
                     {
                       /* TODO: really want a append_steal() */
                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
-                      close (fds[n]);
+                      (void) g_close (fds[n], NULL);
                     }
                 }
               g_free (fds);
@@ -608,7 +774,40 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
 
   if (bytes_read == -1)
     {
-      _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+      if (G_UNLIKELY (_g_dbus_debug_transport ()))
+        {
+          _g_dbus_debug_print_lock ();
+          g_print ("========================================================================\n"
+                   "GDBus-debug:Transport:\n"
+                   "  ---- READ ERROR on stream of type %s:\n"
+                   "  ---- %s %d: %s\n",
+                   g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
+                   g_quark_to_string (error->domain), error->code,
+                   error->message);
+          _g_dbus_debug_print_unlock ();
+        }
+
+      /* Every async read that uses this callback uses worker->cancellable
+       * as its GCancellable. worker->cancellable gets cancelled if and only
+       * if the GDBusConnection tells us to close (either via
+       * _g_dbus_worker_stop, which is called on last-unref, or directly),
+       * so a cancelled read must mean our connection was closed locally.
+       *
+       * If we're closing, other errors are possible - notably,
+       * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
+       * read in-flight. It seems sensible to treat all read errors during
+       * closing as an expected thing that doesn't trip exit-on-close.
+       *
+       * Because close_expected can't be set until we get into the worker
+       * thread, but the cancellable is signalled sooner (from another
+       * thread), we do still need to check the error.
+       */
+      if (worker->close_expected ||
+          g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+        _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
+      else
+        _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+
       g_error_free (error);
       goto out;
     }
@@ -636,6 +835,8 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
       goto out;
     }
 
+  read_message_print_transport_debug (bytes_read, worker);
+
   worker->read_buffer_cur_size += bytes_read;
   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
     {
@@ -650,7 +851,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
                                                      &error);
           if (message_len == -1)
             {
-              g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
+              g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
               g_error_free (error);
               goto out;
@@ -691,6 +892,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
           if (worker->read_fd_list != NULL)
             {
               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
+              g_object_unref (worker->read_fd_list);
               worker->read_fd_list = NULL;
             }
 #endif
@@ -698,6 +900,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
           if (G_UNLIKELY (_g_dbus_debug_message ()))
             {
               gchar *s;
+              _g_dbus_debug_print_lock ();
               g_print ("========================================================================\n"
                        "GDBus-debug:Message:\n"
                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
@@ -705,9 +908,13 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
               s = g_dbus_message_print (message, 2);
               g_print ("%s", s);
               g_free (s);
-              s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
-              g_print ("%s\n", s);
-              g_free (s);
+              if (G_UNLIKELY (_g_dbus_debug_payload ()))
+                {
+                  s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
+                  g_print ("%s\n", s);
+                  g_free (s);
+                }
+              _g_dbus_debug_print_unlock ();
             }
 
           /* yay, got a message, go deliver it */
@@ -726,7 +933,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
     }
 
  out:
-  g_mutex_unlock (worker->read_lock);
+  g_mutex_unlock (&worker->read_lock);
 
   /* gives up the reference acquired when calling g_input_stream_read_async() */
   _g_dbus_worker_unref (worker);
@@ -736,6 +943,28 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
 static void
 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
 {
+  /* Note that we do need to keep trying to read even if close_expected is
+   * true, because only failing a read causes us to signal 'closed'.
+   */
+
+  /* [KDBUS]
+   * For KDBUS transport we don't  have to alloc buffer (worker->read_buffer)
+   * instead of it we use kdbus memory pool. On connection stage KDBUS client
+   * have to register a memory pool, large enough to  carry all backlog of
+   * data enqueued for the connection.
+   */
+
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  if (G_IS_KDBUS_CONNECTION (worker->stream))
+    {
+      _g_kdbus_read(worker->kdbus,
+                    worker->cancellable,
+                    (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
+                    _g_dbus_worker_ref (worker));
+      return;
+    }
+#endif
+
   /* if bytes_wanted is zero, it means start reading a message */
   if (worker->read_buffer_bytes_wanted == 0)
     {
@@ -776,250 +1005,774 @@ _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
 }
 
 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
-static void
-_g_dbus_worker_do_read (GDBusWorker *worker)
+static gboolean
+_g_dbus_worker_do_initial_read (gpointer data)
 {
-  g_mutex_lock (worker->read_lock);
+  GDBusWorker *worker = data;
+  g_mutex_lock (&worker->read_lock);
   _g_dbus_worker_do_read_unlocked (worker);
-  g_mutex_unlock (worker->read_lock);
+  g_mutex_unlock (&worker->read_lock);
+  return FALSE;
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
 
 struct _MessageToWriteData
 {
+  GDBusWorker  *worker;
   GDBusMessage *message;
   gchar        *blob;
   gsize         blob_size;
+
+  gsize               total_written;
+  GSimpleAsyncResult *simple;
+
 };
 
 static void
 message_to_write_data_free (MessageToWriteData *data)
 {
-  g_object_unref (data->message);
+  _g_dbus_worker_unref (data->worker);
+  if (data->message)
+    g_object_unref (data->message);
   g_free (data->blob);
   g_free (data);
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
-static gboolean
-write_message (GDBusWorker         *worker,
-               MessageToWriteData  *data,
-               GError             **error)
-{
-  gboolean ret;
-
-  g_return_val_if_fail (data->blob_size > 16, FALSE);
+static void write_message_continue_writing (MessageToWriteData *data);
 
-  ret = FALSE;
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_WRITE on entry
+ */
+static void
+write_message_async_cb (GObject      *source_object,
+                        GAsyncResult *res,
+                        gpointer      user_data)
+{
+  MessageToWriteData *data = user_data;
+  GSimpleAsyncResult *simple;
+  gssize bytes_written;
+  GError *error;
 
-  /* First, the initial 16 bytes - special case UNIX sockets here
-   * since it may involve writing an ancillary message with file
-   * descriptors
+  /* Note: we can't access data->simple after calling g_async_result_complete () because the
+   * callback can free @data and we're not completing in idle. So use a copy of the pointer.
    */
-#ifdef G_OS_UNIX
-  {
-    GOutputVector vector;
-    GSocketControlMessage *message;
-    GUnixFDList *fd_list;
-    gssize bytes_written;
-
-    fd_list = g_dbus_message_get_unix_fd_list (data->message);
-
-    message = NULL;
-    if (fd_list != NULL)
-      {
-        if (!G_IS_UNIX_CONNECTION (worker->stream))
-          {
-            g_set_error (error,
-                         G_IO_ERROR,
-                         G_IO_ERROR_INVALID_ARGUMENT,
-                         "Tried sending a file descriptor on unsupported stream of type %s",
-                         g_type_name (G_TYPE_FROM_INSTANCE (worker->stream)));
-            goto out;
-          }
-        else if (!(worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
-          {
-            g_set_error_literal (error,
-                                 G_IO_ERROR,
-                                 G_IO_ERROR_INVALID_ARGUMENT,
-                                 "Tried sending a file descriptor but remote peer does not support this capability");
-            goto out;
-          }
-        message = g_unix_fd_message_new_with_fd_list (fd_list);
-      }
-
-    vector.buffer = data->blob;
-    vector.size = 16;
-
-    bytes_written = g_socket_send_message (worker->socket,
-                                           NULL, /* address */
-                                           &vector,
-                                           1,
-                                           message != NULL ? &message : NULL,
-                                           message != NULL ? 1 : 0,
-                                           G_SOCKET_MSG_NONE,
-                                           worker->cancellable,
-                                           error);
-    if (bytes_written == -1)
-      {
-        g_prefix_error (error, _("Error writing first 16 bytes of message to socket: "));
-        if (message != NULL)
-          g_object_unref (message);
-        goto out;
-      }
-    if (message != NULL)
-      g_object_unref (message);
-
-    if (bytes_written < 16)
-      {
-        /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary
-         * messages are sent?
-         */
-        g_assert_not_reached ();
-      }
-  }
-#else
-  /* write the first 16 bytes (guaranteed to return an error if everything can't be written) */
-  if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
-                                  (const gchar *) data->blob,
-                                  16,
-                                  NULL, /* bytes_written */
-                                  worker->cancellable, /* cancellable */
-                                  error))
-    goto out;
-#endif
+  simple = data->simple;
 
-  /* Then write the rest of the message (guaranteed to return an error if everything can't be written) */
-  if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream),
-                                  (const gchar *) data->blob + 16,
-                                  data->blob_size - 16,
-                                  NULL, /* bytes_written */
-                                  worker->cancellable, /* cancellable */
-                                  error))
-    goto out;
+  error = NULL;
+  bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
+                                                res,
+                                                &error);
+  if (bytes_written == -1)
+    {
+      g_simple_async_result_take_error (simple, error);
+      g_simple_async_result_complete (simple);
+      g_object_unref (simple);
+      goto out;
+    }
+  g_assert (bytes_written > 0); /* zero is never returned */
 
-  ret = TRUE;
+  write_message_print_transport_debug (bytes_written, data);
 
-  if (G_UNLIKELY (_g_dbus_debug_message ()))
+  data->total_written += bytes_written;
+  g_assert (data->total_written <= data->blob_size);
+  if (data->total_written == data->blob_size)
     {
-      gchar *s;
-      g_print ("========================================================================\n"
-               "GDBus-debug:Message:\n"
-               "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
-               data->blob_size);
-      s = g_dbus_message_print (data->message, 2);
-      g_print ("%s", s);
-      g_free (s);
-      s = _g_dbus_hexdump (data->blob, data->blob_size, 2);
-      g_print ("%s\n", s);
-      g_free (s);
+      g_simple_async_result_complete (simple);
+      g_object_unref (simple);
+      goto out;
     }
 
+  write_message_continue_writing (data);
+
  out:
-  return ret;
+  ;
 }
 
-/* ---------------------------------------------------------------------------------------------------- */
-
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_WRITE on entry
+ */
+#ifdef G_OS_UNIX
 static gboolean
-write_message_in_idle_cb (gpointer user_data)
+on_socket_ready (GSocket      *socket,
+                 GIOCondition  condition,
+                 gpointer      user_data)
 {
-  GDBusWorker *worker = user_data;
-  gboolean more_writes_are_pending;
-  MessageToWriteData *data;
-  gboolean message_was_dropped;
-  GError *error;
+  MessageToWriteData *data = user_data;
+  write_message_continue_writing (data);
+  return FALSE; /* remove source */
+}
+#endif
 
-  g_mutex_lock (worker->write_lock);
-  data = g_queue_pop_head (worker->write_queue);
-  g_assert (data != NULL);
-  more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
-  worker->write_is_pending = more_writes_are_pending;
-  g_mutex_unlock (worker->write_lock);
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_WRITE on entry
+ */
+static void
+write_message_continue_writing (MessageToWriteData *data)
+{
+  GOutputStream *ostream;
+#ifdef G_OS_UNIX
+  GSimpleAsyncResult *simple;
+  GUnixFDList *fd_list;
+#endif
 
-  /* Note that write_lock is only used for protecting the @write_queue
-   * and @write_is_pending fields of the GDBusWorker struct ... which we
-   * need to modify from arbitrary threads in _g_dbus_worker_send_message().
-   *
-   * Therefore, it's fine to drop it here when calling back into user
-   * code and then writing the message out onto the GIOStream since this
-   * function only runs on the worker thread.
+#ifdef G_OS_UNIX
+  /* Note: we can't access data->simple after calling g_async_result_complete () because the
+   * callback can free @data and we're not completing in idle. So use a copy of the pointer.
    */
-  message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
-  if (G_LIKELY (!message_was_dropped))
+  simple = data->simple;
+#endif
+
+  ostream = g_io_stream_get_output_stream (data->worker->stream);
+#ifdef G_OS_UNIX
+  fd_list = g_dbus_message_get_unix_fd_list (data->message);
+#endif
+
+  g_assert (!g_output_stream_has_pending (ostream));
+  g_assert_cmpint (data->total_written, <, data->blob_size);
+
+  if (FALSE)
     {
-      error = NULL;
-      if (!write_message (worker,
-                          data,
-                          &error))
-        {
-          /* TODO: handle */
-          _g_dbus_worker_emit_disconnected (worker, TRUE, error);
-          g_error_free (error);
-        }
     }
-  message_to_write_data_free (data);
+#ifdef G_OS_UNIX
+  else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
+    {
+      GOutputVector vector;
+      GSocketControlMessage *control_message;
+      gssize bytes_written;
+      GError *error;
 
-  return more_writes_are_pending;
-}
+      vector.buffer = data->blob;
+      vector.size = data->blob_size;
 
+      control_message = NULL;
+      if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
+        {
+          if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
+            {
+              g_simple_async_result_set_error (simple,
+                                               G_IO_ERROR,
+                                               G_IO_ERROR_FAILED,
+                                               "Tried sending a file descriptor but remote peer does not support this capability");
+              g_simple_async_result_complete (simple);
+              g_object_unref (simple);
+              goto out;
+            }
+          control_message = g_unix_fd_message_new_with_fd_list (fd_list);
+        }
+
+      error = NULL;
+      bytes_written = g_socket_send_message (data->worker->socket,
+                                             NULL, /* address */
+                                             &vector,
+                                             1,
+                                             control_message != NULL ? &control_message : NULL,
+                                             control_message != NULL ? 1 : 0,
+                                             G_SOCKET_MSG_NONE,
+                                             data->worker->cancellable,
+                                             &error);
+      if (control_message != NULL)
+        g_object_unref (control_message);
+
+      if (bytes_written == -1)
+        {
+          /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
+          if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+            {
+              GSource *source;
+              source = g_socket_create_source (data->worker->socket,
+                                               G_IO_OUT | G_IO_HUP | G_IO_ERR,
+                                               data->worker->cancellable);
+              g_source_set_callback (source,
+                                     (GSourceFunc) on_socket_ready,
+                                     data,
+                                     NULL); /* GDestroyNotify */
+              g_source_attach (source, g_main_context_get_thread_default ());
+              g_source_unref (source);
+              g_error_free (error);
+              goto out;
+            }
+          g_simple_async_result_take_error (simple, error);
+          g_simple_async_result_complete (simple);
+          g_object_unref (simple);
+          goto out;
+        }
+      g_assert (bytes_written > 0); /* zero is never returned */
+
+      write_message_print_transport_debug (bytes_written, data);
+
+      data->total_written += bytes_written;
+      g_assert (data->total_written <= data->blob_size);
+      if (data->total_written == data->blob_size)
+        {
+          g_simple_async_result_complete (simple);
+          g_object_unref (simple);
+          goto out;
+        }
+
+      write_message_continue_writing (data);
+    }
+#endif
+  else
+    {
+#ifdef G_OS_UNIX
+      if (fd_list != NULL)
+        {
+          g_simple_async_result_set_error (simple,
+                                           G_IO_ERROR,
+                                           G_IO_ERROR_FAILED,
+                                           "Tried sending a file descriptor on unsupported stream of type %s",
+                                           g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
+          g_simple_async_result_complete (simple);
+          g_object_unref (simple);
+          goto out;
+        }
+#endif
+
+      g_output_stream_write_async (ostream,
+                                   (const gchar *) data->blob + data->total_written,
+                                   data->blob_size - data->total_written,
+                                   G_PRIORITY_DEFAULT,
+                                   data->worker->cancellable,
+                                   write_message_async_cb,
+                                   data);
+    }
+#ifdef G_OS_UNIX
+ out:
+#endif
+  ;
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_WRITE on entry
+ */
+static void
+write_message_async (GDBusWorker         *worker,
+                     MessageToWriteData  *data,
+                     GAsyncReadyCallback  callback,
+                     gpointer             user_data)
+{
+  data->simple = g_simple_async_result_new (NULL,
+                                            callback,
+                                            user_data,
+                                            write_message_async);
+  data->total_written = 0;
+  write_message_continue_writing (data);
+}
+
+/* called in private thread shared by all GDBusConnection instances (with write-lock held) */
+static gboolean
+write_message_finish (GAsyncResult   *res,
+                      GError        **error)
+{
+  g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async);
+  if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
+    return FALSE;
+  else
+    return TRUE;
+}
 /* ---------------------------------------------------------------------------------------------------- */
 
-/* can be called from any thread - steals blob */
-void
-_g_dbus_worker_send_message (GDBusWorker    *worker,
-                             GDBusMessage   *message,
-                             gchar          *blob,
-                             gsize           blob_len)
+static void continue_writing (GDBusWorker *worker);
+
+typedef struct
+{
+  GDBusWorker *worker;
+  GList *flushers;
+} FlushAsyncData;
+
+static void
+flush_data_list_complete (const GList  *flushers,
+                          const GError *error)
+{
+  const GList *l;
+
+  for (l = flushers; l != NULL; l = l->next)
+    {
+      FlushData *f = l->data;
+
+      f->error = error != NULL ? g_error_copy (error) : NULL;
+
+      g_mutex_lock (&f->mutex);
+      g_cond_signal (&f->cond);
+      g_mutex_unlock (&f->mutex);
+    }
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_FLUSH on entry
+ */
+static void
+ostream_flush_cb (GObject      *source_object,
+                  GAsyncResult *res,
+                  gpointer      user_data)
+{
+  FlushAsyncData *data = user_data;
+  GError *error;
+
+  error = NULL;
+  g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
+                                res,
+                                &error);
+
+  if (error == NULL)
+    {
+      if (G_UNLIKELY (_g_dbus_debug_transport ()))
+        {
+          _g_dbus_debug_print_lock ();
+          g_print ("========================================================================\n"
+                   "GDBus-debug:Transport:\n"
+                   "  ---- FLUSHED stream of type %s\n",
+                   g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
+          _g_dbus_debug_print_unlock ();
+        }
+    }
+
+  g_assert (data->flushers != NULL);
+  flush_data_list_complete (data->flushers, error);
+  g_list_free (data->flushers);
+
+  if (error != NULL)
+    g_error_free (error);
+
+  /* Make sure we tell folks that we don't have additional
+     flushes pending */
+  g_mutex_lock (&data->worker->write_lock);
+  data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
+  g_assert (data->worker->output_pending == PENDING_FLUSH);
+  data->worker->output_pending = PENDING_NONE;
+  g_mutex_unlock (&data->worker->write_lock);
+
+  /* OK, cool, finally kick off the next write */
+  continue_writing (data->worker);
+
+  _g_dbus_worker_unref (data->worker);
+  g_free (data);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_FLUSH on entry
+ */
+static void
+start_flush (FlushAsyncData *data)
+{
+  g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
+                               G_PRIORITY_DEFAULT,
+                               data->worker->cancellable,
+                               ostream_flush_cb,
+                               data);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ */
+static void
+message_written_unlocked (GDBusWorker *worker,
+                          MessageToWriteData *message_data)
+{
+  if (G_UNLIKELY (_g_dbus_debug_message ()))
+    {
+      gchar *s;
+      _g_dbus_debug_print_lock ();
+      g_print ("========================================================================\n"
+               "GDBus-debug:Message:\n"
+               "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
+               message_data->blob_size);
+      s = g_dbus_message_print (message_data->message, 2);
+      g_print ("%s", s);
+      g_free (s);
+      if (G_UNLIKELY (_g_dbus_debug_payload ()))
+        {
+          s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
+          g_print ("%s\n", s);
+          g_free (s);
+        }
+      _g_dbus_debug_print_unlock ();
+    }
+
+  worker->write_num_messages_written += 1;
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ *
+ * Returns: non-%NULL, setting @output_pending, if we need to flush now
+ */
+static FlushAsyncData *
+prepare_flush_unlocked (GDBusWorker *worker)
+{
+  GList *l;
+  GList *ll;
+  GList *flushers;
+
+  flushers = NULL;
+  for (l = worker->write_pending_flushes; l != NULL; l = ll)
+    {
+      FlushData *f = l->data;
+      ll = l->next;
+
+      if (f->number_to_wait_for == worker->write_num_messages_written)
+        {
+          flushers = g_list_append (flushers, f);
+          worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
+        }
+    }
+  if (flushers != NULL)
+    {
+      g_assert (worker->output_pending == PENDING_NONE);
+      worker->output_pending = PENDING_FLUSH;
+    }
+
+  if (flushers != NULL)
+    {
+      FlushAsyncData *data;
+
+      data = g_new0 (FlushAsyncData, 1);
+      data->worker = _g_dbus_worker_ref (worker);
+      data->flushers = flushers;
+      return data;
+    }
+
+  return NULL;
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_WRITE on entry
+ */
+static void
+write_message_cb (GObject       *source_object,
+                  GAsyncResult  *res,
+                  gpointer       user_data)
+{
+  MessageToWriteData *data = user_data;
+  GError *error;
+
+  g_mutex_lock (&data->worker->write_lock);
+  g_assert (data->worker->output_pending == PENDING_WRITE);
+  data->worker->output_pending = PENDING_NONE;
+
+  error = NULL;
+  if (!write_message_finish (res, &error))
+    {
+      g_mutex_unlock (&data->worker->write_lock);
+
+      /* TODO: handle */
+      _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
+      g_error_free (error);
+
+      g_mutex_lock (&data->worker->write_lock);
+    }
+
+  message_written_unlocked (data->worker, data);
+
+  g_mutex_unlock (&data->worker->write_lock);
+
+  continue_writing (data->worker);
+
+  message_to_write_data_free (data);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_CLOSE on entry
+ */
+static void
+iostream_close_cb (GObject      *source_object,
+                   GAsyncResult *res,
+                   gpointer      user_data)
+{
+  GDBusWorker *worker = user_data;
+  GError *error = NULL;
+  GList *pending_close_attempts, *pending_flush_attempts;
+  GQueue *send_queue;
+
+  g_io_stream_close_finish (worker->stream, res, &error);
+
+  g_mutex_lock (&worker->write_lock);
+
+  pending_close_attempts = worker->pending_close_attempts;
+  worker->pending_close_attempts = NULL;
+
+  pending_flush_attempts = worker->write_pending_flushes;
+  worker->write_pending_flushes = NULL;
+
+  send_queue = worker->write_queue;
+  worker->write_queue = g_queue_new ();
+
+  g_assert (worker->output_pending == PENDING_CLOSE);
+  worker->output_pending = PENDING_NONE;
+
+  g_mutex_unlock (&worker->write_lock);
+
+  while (pending_close_attempts != NULL)
+    {
+      CloseData *close_data = pending_close_attempts->data;
+
+      pending_close_attempts = g_list_delete_link (pending_close_attempts,
+                                                   pending_close_attempts);
+
+      if (close_data->result != NULL)
+        {
+          if (error != NULL)
+            g_simple_async_result_set_from_error (close_data->result, error);
+
+          /* this must be in an idle because the result is likely to be
+           * intended for another thread
+           */
+          g_simple_async_result_complete_in_idle (close_data->result);
+        }
+
+      close_data_free (close_data);
+    }
+
+  g_clear_error (&error);
+
+  /* all messages queued for sending are discarded */
+  g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
+  /* all queued flushes fail */
+  error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
+                       _("Operation was cancelled"));
+  flush_data_list_complete (pending_flush_attempts, error);
+  g_list_free (pending_flush_attempts);
+  g_clear_error (&error);
+
+  _g_dbus_worker_unref (worker);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending must be PENDING_NONE on entry
+ */
+static void
+continue_writing (GDBusWorker *worker)
 {
   MessageToWriteData *data;
+  FlushAsyncData *flush_async_data;
 
-  g_return_if_fail (G_IS_DBUS_MESSAGE (message));
-  g_return_if_fail (blob != NULL);
-  g_return_if_fail (blob_len > 16);
+ write_next:
+  /* we mustn't try to write two things at once */
+  g_assert (worker->output_pending == PENDING_NONE);
 
-  data = g_new0 (MessageToWriteData, 1);
-  data->message = g_object_ref (message);
-  data->blob = blob; /* steal! */
-  data->blob_size = blob_len;
+  g_mutex_lock (&worker->write_lock);
+
+  data = NULL;
+  flush_async_data = NULL;
 
-  g_mutex_lock (worker->write_lock);
-  g_queue_push_tail (worker->write_queue, data);
-  if (!worker->write_is_pending)
+  /* if we want to close the connection, that takes precedence */
+  if (worker->pending_close_attempts != NULL)
     {
-      GSource *idle_source;
+      worker->close_expected = TRUE;
+      worker->output_pending = PENDING_CLOSE;
+
+      g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
+                               NULL, iostream_close_cb,
+                               _g_dbus_worker_ref (worker));
+    }
+  else
+    {
+      flush_async_data = prepare_flush_unlocked (worker);
+
+      if (flush_async_data == NULL)
+        {
+          data = g_queue_pop_head (worker->write_queue);
+
+          if (data != NULL)
+            worker->output_pending = PENDING_WRITE;
+        }
+    }
+
+  g_mutex_unlock (&worker->write_lock);
+
+  /* Note that write_lock is only used for protecting the @write_queue
+   * and @output_pending fields of the GDBusWorker struct ... which we
+   * need to modify from arbitrary threads in _g_dbus_worker_send_message().
+   *
+   * Therefore, it's fine to drop it here when calling back into user
+   * code and then writing the message out onto the GIOStream since this
+   * function only runs on the worker thread.
+   */
+
+  if (flush_async_data != NULL)
+    {
+      start_flush (flush_async_data);
+      g_assert (data == NULL);
+    }
+  else if (data != NULL)
+    {
+      GDBusMessage *old_message;
+      guchar *new_blob;
+      gsize new_blob_size;
+      GError *error;
+
+      old_message = data->message;
+      data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
+      if (data->message == old_message)
+        {
+          /* filters had no effect - do nothing */
+        }
+      else if (data->message == NULL)
+        {
+          /* filters dropped message */
+          g_mutex_lock (&worker->write_lock);
+          worker->output_pending = PENDING_NONE;
+          g_mutex_unlock (&worker->write_lock);
+          message_to_write_data_free (data);
+          goto write_next;
+        }
+      else
+        {
+          /* filters altered the message -> reencode */
+          error = NULL;
+          new_blob = g_dbus_message_to_blob (data->message,
+                                             &new_blob_size,
+                                             worker->capabilities,
+                                             &error);
+          if (new_blob == NULL)
+            {
+              /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
+               * the old message instead
+               */
+              g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
+                         g_dbus_message_get_serial (data->message),
+                         error->message);
+              g_error_free (error);
+            }
+          else
+            {
+              g_free (data->blob);
+              data->blob = (gchar *) new_blob;
+              data->blob_size = new_blob_size;
+            }
+        }
+
+      write_message_async (worker,
+                           data,
+                           write_message_cb,
+                           data);
+    }
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending may be anything
+ */
+static gboolean
+continue_writing_in_idle_cb (gpointer user_data)
+{
+  GDBusWorker *worker = user_data;
+
+  /* Because this is the worker thread, we can read this struct member
+   * without holding the lock: no other thread ever modifies it.
+   */
+  if (worker->output_pending == PENDING_NONE)
+    continue_writing (worker);
+
+  return FALSE;
+}
+
+/**
+ * @write_data: (transfer full) (allow-none):
+ * @flush_data: (transfer full) (allow-none):
+ * @close_data: (transfer full) (allow-none):
+ *
+ * Can be called from any thread
+ *
+ * write_lock is held on entry
+ * output_pending may be anything
+ */
+static void
+schedule_writing_unlocked (GDBusWorker        *worker,
+                           MessageToWriteData *write_data,
+                           FlushData          *flush_data,
+                           CloseData          *close_data)
+{
+  if (write_data != NULL)
+    g_queue_push_tail (worker->write_queue, write_data);
 
-      worker->write_is_pending = TRUE;
+  if (flush_data != NULL)
+    worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
 
+  if (close_data != NULL)
+    worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
+                                                     close_data);
+
+  /* If we had output pending, the next bit of output will happen
+   * automatically when it finishes, so we only need to do this
+   * if nothing was pending.
+   *
+   * The idle callback will re-check that output_pending is still
+   * PENDING_NONE, to guard against output starting before the idle.
+   */
+  if (worker->output_pending == PENDING_NONE)
+    {
+      GSource *idle_source;
       idle_source = g_idle_source_new ();
       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
       g_source_set_callback (idle_source,
-                             write_message_in_idle_cb,
+                             continue_writing_in_idle_cb,
                              _g_dbus_worker_ref (worker),
                              (GDestroyNotify) _g_dbus_worker_unref);
-      g_source_attach (idle_source, shared_thread_data->context);
+      g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
+      g_source_attach (idle_source, worker->shared_thread_data->context);
       g_source_unref (idle_source);
     }
-  g_mutex_unlock (worker->write_lock);
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
 
-static void
-_g_dbus_worker_thread_begin_func (gpointer user_data)
+/* can be called from any thread - steals blob
+ *
+ * write_lock is not held on entry
+ * output_pending may be anything
+ */
+void
+_g_dbus_worker_send_message (GDBusWorker    *worker,
+                             GDBusMessage   *message,
+                             gchar          *blob,
+                             gsize           blob_len)
 {
-  GDBusWorker *worker = user_data;
+  MessageToWriteData *data;
 
-  worker->thread = g_thread_self ();
+  g_return_if_fail (G_IS_DBUS_MESSAGE (message));
+  g_return_if_fail (blob != NULL);
+  g_return_if_fail (blob_len > 16);
 
-  /* begin reading */
-  _g_dbus_worker_do_read (worker);
+  data = g_new0 (MessageToWriteData, 1);
+  data->worker = _g_dbus_worker_ref (worker);
+  data->message = g_object_ref (message);
+  data->blob = blob; /* steal! */
+  data->blob_size = blob_len;
+
+  g_mutex_lock (&worker->write_lock);
+  schedule_writing_unlocked (worker, data, NULL, NULL);
+  g_mutex_unlock (&worker->write_lock);
 }
 
+/* ---------------------------------------------------------------------------------------------------- */
+
 GDBusWorker *
 _g_dbus_worker_new (GIOStream                              *stream,
                     GDBusCapabilityFlags                    capabilities,
@@ -1030,6 +1783,7 @@ _g_dbus_worker_new (GIOStream                              *stream,
                     gpointer                                user_data)
 {
   GDBusWorker *worker;
+  GSource *idle_source;
 
   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
   g_return_val_if_fail (message_received_callback != NULL, NULL);
@@ -1039,7 +1793,7 @@ _g_dbus_worker_new (GIOStream                              *stream,
   worker = g_new0 (GDBusWorker, 1);
   worker->ref_count = 1;
 
-  worker->read_lock = g_mutex_new ();
+  g_mutex_init (&worker->read_lock);
   worker->message_received_callback = message_received_callback;
   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
   worker->disconnected_callback = disconnected_callback;
@@ -1047,44 +1801,170 @@ _g_dbus_worker_new (GIOStream                              *stream,
   worker->stream = g_object_ref (stream);
   worker->capabilities = capabilities;
   worker->cancellable = g_cancellable_new ();
+  worker->output_pending = PENDING_NONE;
 
   worker->frozen = initially_frozen;
   worker->received_messages_while_frozen = g_queue_new ();
 
-  worker->write_lock = g_mutex_new ();
+  g_mutex_init (&worker->write_lock);
   worker->write_queue = g_queue_new ();
 
   if (G_IS_SOCKET_CONNECTION (worker->stream))
     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
 
-  _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker);
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  if (G_IS_KDBUS_CONNECTION (worker->stream))
+    worker->kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream));
+#endif
+
+  worker->shared_thread_data = _g_dbus_shared_thread_ref ();
+
+  /* begin reading */
+  idle_source = g_idle_source_new ();
+  g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+  g_source_set_callback (idle_source,
+                         _g_dbus_worker_do_initial_read,
+                         _g_dbus_worker_ref (worker),
+                         (GDestroyNotify) _g_dbus_worker_unref);
+  g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
+  g_source_attach (idle_source, worker->shared_thread_data->context);
+  g_source_unref (idle_source);
 
   return worker;
 }
 
-/* This can be called from any thread - frees worker - guarantees no callbacks
- * will ever be issued again
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* can be called from any thread
+ *
+ * write_lock is not held on entry
+ * output_pending may be anything
+ */
+void
+_g_dbus_worker_close (GDBusWorker         *worker,
+                      GCancellable        *cancellable,
+                      GSimpleAsyncResult  *result)
+{
+  CloseData *close_data;
+
+  close_data = g_slice_new0 (CloseData);
+  close_data->worker = _g_dbus_worker_ref (worker);
+  close_data->cancellable =
+      (cancellable == NULL ? NULL : g_object_ref (cancellable));
+  close_data->result = (result == NULL ? NULL : g_object_ref (result));
+
+  /* Don't set worker->close_expected here - we're in the wrong thread.
+   * It'll be set before the actual close happens.
+   */
+  g_cancellable_cancel (worker->cancellable);
+  g_mutex_lock (&worker->write_lock);
+  schedule_writing_unlocked (worker, NULL, NULL, close_data);
+  g_mutex_unlock (&worker->write_lock);
+}
+
+/* This can be called from any thread - frees worker. Note that
+ * callbacks might still happen if called from another thread than the
+ * worker - use your own synchronization primitive in the callbacks.
+ *
+ * write_lock is not held on entry
+ * output_pending may be anything
  */
 void
 _g_dbus_worker_stop (GDBusWorker *worker)
 {
-  /* If we're called in the worker thread it means we are called from
-   * a worker callback. This is fine, we just can't lock in that case since
-   * we're already holding the lock...
+  g_atomic_int_set (&worker->stopped, TRUE);
+
+  /* Cancel any pending operations and schedule a close of the underlying I/O
+   * stream in the worker thread
    */
-  if (g_thread_self () != worker->thread)
-    g_mutex_lock (worker->read_lock);
-  worker->stopped = TRUE;
-  if (g_thread_self () != worker->thread)
-    g_mutex_unlock (worker->read_lock);
+  _g_dbus_worker_close (worker, NULL, NULL);
 
-  g_cancellable_cancel (worker->cancellable);
+  /* _g_dbus_worker_close holds a ref until after an idle in the worker
+   * thread has run, so we no longer need to unref in an idle like in
+   * commit 322e25b535
+   */
   _g_dbus_worker_unref (worker);
 }
 
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* can be called from any thread (except the worker thread) - blocks
+ * calling thread until all queued outgoing messages are written and
+ * the transport has been flushed
+ *
+ * write_lock is not held on entry
+ * output_pending may be anything
+ */
+gboolean
+_g_dbus_worker_flush_sync (GDBusWorker    *worker,
+                           GCancellable   *cancellable,
+                           GError        **error)
+{
+  gboolean ret;
+  FlushData *data;
+  guint64 pending_writes;
+
+  data = NULL;
+  ret = TRUE;
+
+  g_mutex_lock (&worker->write_lock);
+
+  /* if the queue is empty, no write is in-flight and we haven't written
+   * anything since the last flush, then there's nothing to wait for
+   */
+  pending_writes = g_queue_get_length (worker->write_queue);
+
+  /* if a write is in-flight, we shouldn't be satisfied until the first
+   * flush operation that follows it
+   */
+  if (worker->output_pending == PENDING_WRITE)
+    pending_writes += 1;
+
+  if (pending_writes > 0 ||
+      worker->write_num_messages_written != worker->write_num_messages_flushed)
+    {
+      data = g_new0 (FlushData, 1);
+      g_mutex_init (&data->mutex);
+      g_cond_init (&data->cond);
+      data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
+      g_mutex_lock (&data->mutex);
+
+      schedule_writing_unlocked (worker, NULL, data, NULL);
+    }
+  g_mutex_unlock (&worker->write_lock);
+
+  if (data != NULL)
+    {
+      g_cond_wait (&data->cond, &data->mutex);
+      g_mutex_unlock (&data->mutex);
+
+      /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
+      g_cond_clear (&data->cond);
+      g_mutex_clear (&data->mutex);
+      if (data->error != NULL)
+        {
+          ret = FALSE;
+          g_propagate_error (error, data->error);
+        }
+      g_free (data);
+    }
+
+  return ret;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
-#define G_DBUS_DEBUG_MESSAGE        (1<<1)
-#define G_DBUS_DEBUG_ALL            0xffffffff
+#define G_DBUS_DEBUG_TRANSPORT      (1<<1)
+#define G_DBUS_DEBUG_MESSAGE        (1<<2)
+#define G_DBUS_DEBUG_PAYLOAD        (1<<3)
+#define G_DBUS_DEBUG_CALL           (1<<4)
+#define G_DBUS_DEBUG_SIGNAL         (1<<5)
+#define G_DBUS_DEBUG_INCOMING       (1<<6)
+#define G_DBUS_DEBUG_RETURN         (1<<7)
+#define G_DBUS_DEBUG_EMISSION       (1<<8)
+#define G_DBUS_DEBUG_ADDRESS        (1<<9)
+
 static gint _gdbus_debug_flags = 0;
 
 gboolean
@@ -1095,13 +1975,83 @@ _g_dbus_debug_authentication (void)
 }
 
 gboolean
+_g_dbus_debug_transport (void)
+{
+  _g_dbus_initialize ();
+  return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
+}
+
+gboolean
 _g_dbus_debug_message (void)
 {
   _g_dbus_initialize ();
   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
 }
 
-/*
+gboolean
+_g_dbus_debug_payload (void)
+{
+  _g_dbus_initialize ();
+  return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
+}
+
+gboolean
+_g_dbus_debug_call (void)
+{
+  _g_dbus_initialize ();
+  return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
+}
+
+gboolean
+_g_dbus_debug_signal (void)
+{
+  _g_dbus_initialize ();
+  return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
+}
+
+gboolean
+_g_dbus_debug_incoming (void)
+{
+  _g_dbus_initialize ();
+  return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
+}
+
+gboolean
+_g_dbus_debug_return (void)
+{
+  _g_dbus_initialize ();
+  return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
+}
+
+gboolean
+_g_dbus_debug_emission (void)
+{
+  _g_dbus_initialize ();
+  return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
+}
+
+gboolean
+_g_dbus_debug_address (void)
+{
+  _g_dbus_initialize ();
+  return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
+}
+
+G_LOCK_DEFINE_STATIC (print_lock);
+
+void
+_g_dbus_debug_print_lock (void)
+{
+  G_LOCK (print_lock);
+}
+
+void
+_g_dbus_debug_print_unlock (void)
+{
+  G_UNLOCK (print_lock);
+}
+
+/**
  * _g_dbus_initialize:
  *
  * Does various one-time init things such as
@@ -1120,23 +2070,27 @@ _g_dbus_initialize (void)
       const gchar *debug;
 
       g_dbus_error_domain = G_DBUS_ERROR;
+      (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
 
       debug = g_getenv ("G_DBUS_DEBUG");
       if (debug != NULL)
         {
-          gchar **tokens;
-          guint n;
-          tokens = g_strsplit (debug, ",", 0);
-          for (n = 0; tokens[n] != NULL; n++)
-            {
-              if (g_strcmp0 (tokens[n], "authentication") == 0)
-                _gdbus_debug_flags |= G_DBUS_DEBUG_AUTHENTICATION;
-              else if (g_strcmp0 (tokens[n], "message") == 0)
-                _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
-              else if (g_strcmp0 (tokens[n], "all") == 0)
-                _gdbus_debug_flags |= G_DBUS_DEBUG_ALL;
-            }
-          g_strfreev (tokens);
+          const GDebugKey keys[] = {
+            { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
+            { "transport",      G_DBUS_DEBUG_TRANSPORT      },
+            { "message",        G_DBUS_DEBUG_MESSAGE        },
+            { "payload",        G_DBUS_DEBUG_PAYLOAD        },
+            { "call",           G_DBUS_DEBUG_CALL           },
+            { "signal",         G_DBUS_DEBUG_SIGNAL         },
+            { "incoming",       G_DBUS_DEBUG_INCOMING       },
+            { "return",         G_DBUS_DEBUG_RETURN         },
+            { "emission",       G_DBUS_DEBUG_EMISSION       },
+            { "address",        G_DBUS_DEBUG_ADDRESS        }
+          };
+
+          _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
+          if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
+            _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
         }
 
       g_once_init_leave (&initialized, 1);
@@ -1239,5 +2193,186 @@ out:
 
 /* ---------------------------------------------------------------------------------------------------- */
 
-#define __G_DBUS_PRIVATE_C__
-#include "gioaliasdef.c"
+gchar *
+_g_dbus_get_machine_id (GError **error)
+{
+#ifdef G_OS_WIN32
+  HW_PROFILE_INFOA info;
+  char *src, *dest, *res;
+  int i;
+
+  if (!GetCurrentHwProfileA (&info))
+    {
+      char *message = g_win32_error_message (GetLastError ());
+      g_set_error (error,
+                  G_IO_ERROR,
+                  G_IO_ERROR_FAILED,
+                  _("Unable to get Hardware profile: %s"), message);
+      g_free (message);
+      return NULL;
+    }
+
+  /* Form: {12340001-4980-1920-6788-123456789012} */
+  src = &info.szHwProfileGuid[0];
+
+  res = g_malloc (32+1);
+  dest = res;
+
+  src++; /* Skip { */
+  for (i = 0; i < 8; i++)
+    *dest++ = *src++;
+  src++; /* Skip - */
+  for (i = 0; i < 4; i++)
+    *dest++ = *src++;
+  src++; /* Skip - */
+  for (i = 0; i < 4; i++)
+    *dest++ = *src++;
+  src++; /* Skip - */
+  for (i = 0; i < 4; i++)
+    *dest++ = *src++;
+  src++; /* Skip - */
+  for (i = 0; i < 12; i++)
+    *dest++ = *src++;
+  *dest = 0;
+
+  return res;
+#else
+  gchar *ret;
+  GError *first_error;
+  /* TODO: use PACKAGE_LOCALSTATEDIR ? */
+  ret = NULL;
+  first_error = NULL;
+  if (!g_file_get_contents ("/var/lib/dbus/machine-id",
+                            &ret,
+                            NULL,
+                            &first_error) &&
+      !g_file_get_contents ("/etc/machine-id",
+                            &ret,
+                            NULL,
+                            NULL))
+    {
+      g_propagate_prefixed_error (error, first_error,
+                                  _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
+    }
+  else
+    {
+      /* ignore the error from the first try, if any */
+      g_clear_error (&first_error);
+      /* TODO: validate value */
+      g_strstrip (ret);
+    }
+  return ret;
+#endif
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+gchar *
+_g_dbus_enum_to_string (GType enum_type, gint value)
+{
+  gchar *ret;
+  GEnumClass *klass;
+  GEnumValue *enum_value;
+
+  klass = g_type_class_ref (enum_type);
+  enum_value = g_enum_get_value (klass, value);
+  if (enum_value != NULL)
+    ret = g_strdup (enum_value->value_nick);
+  else
+    ret = g_strdup_printf ("unknown (value %d)", value);
+  g_type_class_unref (klass);
+  return ret;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+static void
+write_message_print_transport_debug (gssize bytes_written,
+                                     MessageToWriteData *data)
+{
+  if (G_LIKELY (!_g_dbus_debug_transport ()))
+    goto out;
+
+  _g_dbus_debug_print_lock ();
+  g_print ("========================================================================\n"
+           "GDBus-debug:Transport:\n"
+           "  >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
+           "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
+           bytes_written,
+           g_dbus_message_get_serial (data->message),
+           data->blob_size,
+           data->total_written,
+           g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
+  _g_dbus_debug_print_unlock ();
+ out:
+  ;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+static void
+read_message_print_transport_debug (gssize bytes_read,
+                                    GDBusWorker *worker)
+{
+  gsize size;
+  gint32 serial;
+  gint32 message_length;
+
+  if (G_LIKELY (!_g_dbus_debug_transport ()))
+    goto out;
+
+  size = bytes_read + worker->read_buffer_cur_size;
+  serial = 0;
+  message_length = 0;
+  if (size >= 16)
+    message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
+  if (size >= 1)
+    {
+      switch (worker->read_buffer[0])
+        {
+        case 'l':
+          if (size >= 12)
+            serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
+          break;
+        case 'B':
+          if (size >= 12)
+            serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
+          break;
+        default:
+          /* an error will be set elsewhere if this happens */
+          goto out;
+        }
+    }
+
+    _g_dbus_debug_print_lock ();
+  g_print ("========================================================================\n"
+           "GDBus-debug:Transport:\n"
+           "  <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
+           "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
+           bytes_read,
+           serial,
+           message_length,
+           worker->read_buffer_cur_size,
+           g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
+  _g_dbus_debug_print_unlock ();
+ out:
+  ;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+gboolean
+_g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
+                                     GValue                *return_accu,
+                                     const GValue          *handler_return,
+                                     gpointer               dummy)
+{
+  gboolean continue_emission;
+  gboolean signal_return;
+
+  signal_return = g_value_get_boolean (handler_return);
+  g_value_set_boolean (return_accu, signal_return);
+  continue_emission = signal_return;
+
+  return continue_emission;
+}