[kdbus] Enable 'initial_read()' - is no longer needed
[platform/upstream/glib.git] / gio / gdbusprivate.c
index 7ccf98f..91feb41 100644 (file)
@@ -13,9 +13,7 @@
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General
- * Public License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
  *
  * Author: David Zeuthen <davidz@redhat.com>
  */
@@ -24,9 +22,6 @@
 
 #include <stdlib.h>
 #include <string.h>
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
 
 #include "giotypes.h"
 #include "gsocket.h"
 #include "ginputstream.h"
 #include "gmemoryinputstream.h"
 #include "giostream.h"
+#include "glib/gstdio.h"
 #include "gsocketcontrolmessage.h"
 #include "gsocketconnection.h"
 #include "gsocketoutputstream.h"
 
 #ifdef G_OS_UNIX
+#include "gkdbus.h"
+#include "gkdbusconnection.h"
 #include "gunixfdmessage.h"
 #include "gunixconnection.h"
 #include "gunixcredentialsmessage.h"
@@ -94,6 +92,107 @@ _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
 
 /* ---------------------------------------------------------------------------------------------------- */
 
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+typedef struct
+{
+  GKdbus *kdbus;
+  GCancellable *cancellable;
+
+  GSimpleAsyncResult *simple;
+
+  gboolean from_mainloop;
+} ReadKdbusData;
+
+static void
+read_kdbus_data_free (ReadKdbusData  *data)
+{
+  g_object_unref (data->kdbus);
+  if (data->cancellable != NULL)
+    g_object_unref (data->cancellable);
+  g_object_unref (data->simple);
+  g_free (data);
+}
+
+static gboolean
+_g_kdbus_read_ready (GKdbus        *kdbus,
+                     GIOCondition   condition,
+                     gpointer       user_data)
+{
+  ReadKdbusData *data = user_data;
+  GError *error = NULL;
+  gssize result;
+
+  result = _g_kdbus_receive (data->kdbus,
+                             data->cancellable,
+                             &error);
+
+  if (result >= 0)
+    {
+      g_simple_async_result_set_op_res_gssize (data->simple, result);
+    }
+  else
+    {
+      g_assert (error != NULL);
+      g_simple_async_result_take_error (data->simple, error);
+    }
+
+  if (data->from_mainloop)
+    g_simple_async_result_complete (data->simple);
+  else
+    g_simple_async_result_complete_in_idle (data->simple);
+
+  return FALSE;
+}
+
+static void
+_g_kdbus_read (GKdbus               *kdbus,
+               GCancellable         *cancellable,
+               GAsyncReadyCallback   callback,
+               gpointer              user_data)
+{
+  ReadKdbusData *data;
+  GSource *source;
+
+  data = g_new0 (ReadKdbusData, 1);
+  data->kdbus = g_object_ref (kdbus);
+  data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
+
+  data->simple = g_simple_async_result_new (G_OBJECT (kdbus),
+                                            callback,
+                                            user_data,
+                                            _g_kdbus_read);
+  g_simple_async_result_set_check_cancellable (data->simple, cancellable);
+
+  data->from_mainloop = TRUE;
+  source = _g_kdbus_create_source (data->kdbus,
+                                   G_IO_IN,
+                                   cancellable);
+  g_source_set_callback (source,
+                         (GSourceFunc) _g_kdbus_read_ready,
+                         data,
+                         (GDestroyNotify) read_kdbus_data_free);
+  g_source_attach (source, g_main_context_get_thread_default ());
+  g_source_unref (source);
+}
+
+static gssize
+_g_kdbus_read_finish (GKdbus        *kdbus,
+                      GAsyncResult  *result,
+                      GError       **error)
+{
+  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+
+  g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
+  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read);
+
+  if (g_simple_async_result_propagate_error (simple, error))
+    return -1;
+  else
+    return g_simple_async_result_get_op_res_gssize (simple);
+}
+
+#endif /* defined (G_OS_UNIX) && (KDBUS_TRANSPORT) */
+
 /* Unfortunately ancillary messages are discarded when reading from a
  * socket using the GSocketInputStream abstraction. So we provide a
  * very GInputStream-ish API that uses GSocket in this case (very
@@ -191,6 +290,7 @@ _g_socket_read_with_control_messages (GSocket                 *socket,
                                             callback,
                                             user_data,
                                             _g_socket_read_with_control_messages);
+  g_simple_async_result_set_check_cancellable (data->simple, cancellable);
 
   if (!g_socket_condition_check (socket, G_IO_IN))
     {
@@ -287,7 +387,6 @@ static SharedThreadData *
 _g_dbus_shared_thread_ref (void)
 {
   static gsize shared_thread_data = 0;
-  GError *error = NULL;
   SharedThreadData *ret;
 
   if (g_once_init_enter (&shared_thread_data))
@@ -304,10 +403,7 @@ _g_dbus_shared_thread_ref (void)
       data->loop = g_main_loop_new (data->context, FALSE);
       data->thread = g_thread_new ("gdbus",
                                    gdbus_shared_thread_func,
-                                   data,
-                                   TRUE,
-                                   &error);
-      g_assert_no_error (error);
+                                   data);
       /* We can cast between gsize and gpointer safely */
       g_once_init_leave (&shared_thread_data, (gsize) data);
     }
@@ -335,6 +431,13 @@ _g_dbus_shared_thread_unref (SharedThreadData *data)
 
 /* ---------------------------------------------------------------------------------------------------- */
 
+typedef enum {
+    PENDING_NONE = 0,
+    PENDING_WRITE,
+    PENDING_FLUSH,
+    PENDING_CLOSE
+} OutputPending;
+
 struct GDBusWorker
 {
   volatile gint                       ref_count;
@@ -359,8 +462,11 @@ struct GDBusWorker
   GDBusWorkerDisconnectedCallback     disconnected_callback;
   gpointer                            user_data;
 
-  /* if not NULL, stream is GSocketConnection */
+  /* if GSocket and GKdbus are NULL, stream is GSocketConnection */
   GSocket *socket;
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  GKdbus  *kdbus;
+#endif
 
   /* used for reading */
   GMutex                              read_lock;
@@ -372,22 +478,28 @@ struct GDBusWorker
   GSocketControlMessage             **read_ancillary_messages;
   gint                                read_num_ancillary_messages;
 
-  /* TRUE if an async write, flush or close is pending.
+  /* Whether an async write, flush or close, or none of those, is pending.
    * Only the worker thread may change its value, and only with the write_lock.
    * Other threads may read its value when holding the write_lock.
    * The worker thread may read its value at any time.
    */
-  gboolean                            output_pending;
+  OutputPending                       output_pending;
   /* used for writing */
   GMutex                              write_lock;
   /* queue of MessageToWriteData, protected by write_lock */
   GQueue                             *write_queue;
   /* protected by write_lock */
   guint64                             write_num_messages_written;
+  /* number of messages we'd written out last time we flushed;
+   * protected by write_lock
+   */
+  guint64                             write_num_messages_flushed;
   /* list of FlushData, protected by write_lock */
   GList                              *write_pending_flushes;
   /* list of CloseData, protected by write_lock */
   GList                              *pending_close_attempts;
+  /* no lock - only used from the worker thread */
+  gboolean                            close_expected;
 };
 
 static void _g_dbus_worker_unref (GDBusWorker *worker);
@@ -456,13 +568,9 @@ _g_dbus_worker_unref (GDBusWorker *worker)
       if (worker->read_fd_list != NULL)
         g_object_unref (worker->read_fd_list);
 
-      g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
-      g_queue_free (worker->received_messages_while_frozen);
-
+      g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
       g_mutex_clear (&worker->write_lock);
-      g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
-      g_queue_free (worker->write_queue);
-
+      g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
       g_free (worker->read_buffer);
 
       g_free (worker);
@@ -552,6 +660,7 @@ _g_dbus_worker_unfreeze (GDBusWorker *worker)
                          unfreeze_in_idle_cb,
                          _g_dbus_worker_ref (worker),
                          (GDestroyNotify) _g_dbus_worker_unref);
+  g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb");
   g_source_attach (idle_source, worker->shared_thread_data->context);
   g_source_unref (idle_source);
 }
@@ -577,7 +686,21 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
     goto out;
 
   error = NULL;
-  if (worker->socket == NULL)
+  bytes_read = 0;
+
+  if (FALSE)
+    {
+    }
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  else if (G_IS_KDBUS_CONNECTION (worker->stream))
+    {
+      bytes_read = _g_kdbus_read_finish (worker->kdbus,
+                                         res,
+                                         &error);
+      g_error ("[KDBUS] _g_dbus_worker_do_read_cb() - work in progress");
+    }
+#endif
+  else if (worker->socket == NULL)
     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
                                              res,
                                              &error);
@@ -615,7 +738,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
                     {
                       /* TODO: really want a append_steal() */
                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
-                      close (fds[n]);
+                      (void) g_close (fds[n], NULL);
                     }
                 }
               g_free (fds);
@@ -651,7 +774,40 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
 
   if (bytes_read == -1)
     {
-      _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+      if (G_UNLIKELY (_g_dbus_debug_transport ()))
+        {
+          _g_dbus_debug_print_lock ();
+          g_print ("========================================================================\n"
+                   "GDBus-debug:Transport:\n"
+                   "  ---- READ ERROR on stream of type %s:\n"
+                   "  ---- %s %d: %s\n",
+                   g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
+                   g_quark_to_string (error->domain), error->code,
+                   error->message);
+          _g_dbus_debug_print_unlock ();
+        }
+
+      /* Every async read that uses this callback uses worker->cancellable
+       * as its GCancellable. worker->cancellable gets cancelled if and only
+       * if the GDBusConnection tells us to close (either via
+       * _g_dbus_worker_stop, which is called on last-unref, or directly),
+       * so a cancelled read must mean our connection was closed locally.
+       *
+       * If we're closing, other errors are possible - notably,
+       * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
+       * read in-flight. It seems sensible to treat all read errors during
+       * closing as an expected thing that doesn't trip exit-on-close.
+       *
+       * Because close_expected can't be set until we get into the worker
+       * thread, but the cancellable is signalled sooner (from another
+       * thread), we do still need to check the error.
+       */
+      if (worker->close_expected ||
+          g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+        _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
+      else
+        _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+
       g_error_free (error);
       goto out;
     }
@@ -695,7 +851,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
                                                      &error);
           if (message_len == -1)
             {
-              g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
+              g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
               g_error_free (error);
               goto out;
@@ -787,6 +943,28 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
 static void
 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
 {
+  /* Note that we do need to keep trying to read even if close_expected is
+   * true, because only failing a read causes us to signal 'closed'.
+   */
+
+  /* [KDBUS]
+   * For KDBUS transport we don't  have to alloc buffer (worker->read_buffer)
+   * instead of it we use kdbus memory pool. On connection stage KDBUS client
+   * have to register a memory pool, large enough to  carry all backlog of
+   * data enqueued for the connection.
+   */
+
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  if (G_IS_KDBUS_CONNECTION (worker->stream))
+    {
+      _g_kdbus_read(worker->kdbus,
+                    worker->cancellable,
+                    (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
+                    _g_dbus_worker_ref (worker));
+      return;
+    }
+#endif
+
   /* if bytes_wanted is zero, it means start reading a message */
   if (worker->read_buffer_bytes_wanted == 0)
     {
@@ -868,7 +1046,7 @@ static void write_message_continue_writing (MessageToWriteData *data);
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
  */
 static void
 write_message_async_cb (GObject      *source_object,
@@ -918,8 +1096,9 @@ write_message_async_cb (GObject      *source_object,
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
  */
+#ifdef G_OS_UNIX
 static gboolean
 on_socket_ready (GSocket      *socket,
                  GIOCondition  condition,
@@ -929,25 +1108,28 @@ on_socket_ready (GSocket      *socket,
   write_message_continue_writing (data);
   return FALSE; /* remove source */
 }
+#endif
 
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
  */
 static void
 write_message_continue_writing (MessageToWriteData *data)
 {
   GOutputStream *ostream;
-  GSimpleAsyncResult *simple;
 #ifdef G_OS_UNIX
+  GSimpleAsyncResult *simple;
   GUnixFDList *fd_list;
 #endif
 
+#ifdef G_OS_UNIX
   /* Note: we can't access data->simple after calling g_async_result_complete () because the
    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
    */
   simple = data->simple;
+#endif
 
   ostream = g_io_stream_get_output_stream (data->worker->stream);
 #ifdef G_OS_UNIX
@@ -1063,14 +1245,16 @@ write_message_continue_writing (MessageToWriteData *data)
                                    write_message_async_cb,
                                    data);
     }
+#ifdef G_OS_UNIX
  out:
+#endif
   ;
 }
 
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
  */
 static void
 write_message_async (GDBusWorker         *worker,
@@ -1086,7 +1270,7 @@ write_message_async (GDBusWorker         *worker,
   write_message_continue_writing (data);
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances (with write-lock held) */
 static gboolean
 write_message_finish (GAsyncResult   *res,
                       GError        **error)
@@ -1099,7 +1283,7 @@ write_message_finish (GAsyncResult   *res,
 }
 /* ---------------------------------------------------------------------------------------------------- */
 
-static void maybe_write_next_message (GDBusWorker *worker);
+static void continue_writing (GDBusWorker *worker);
 
 typedef struct
 {
@@ -1128,7 +1312,7 @@ flush_data_list_complete (const GList  *flushers,
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_FLUSH on entry
  */
 static void
 ostream_flush_cb (GObject      *source_object,
@@ -1166,12 +1350,13 @@ ostream_flush_cb (GObject      *source_object,
   /* Make sure we tell folks that we don't have additional
      flushes pending */
   g_mutex_lock (&data->worker->write_lock);
-  g_assert (data->worker->output_pending);
-  data->worker->output_pending = FALSE;
+  data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
+  g_assert (data->worker->output_pending == PENDING_FLUSH);
+  data->worker->output_pending = PENDING_NONE;
   g_mutex_unlock (&data->worker->write_lock);
 
   /* OK, cool, finally kick off the next write */
-  maybe_write_next_message (data->worker);
+  continue_writing (data->worker);
 
   _g_dbus_worker_unref (data->worker);
   g_free (data);
@@ -1180,17 +1365,27 @@ ostream_flush_cb (GObject      *source_object,
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is false on entry
+ * output_pending is PENDING_FLUSH on entry
  */
 static void
-message_written (GDBusWorker *worker,
-                 MessageToWriteData *message_data)
+start_flush (FlushAsyncData *data)
 {
-  GList *l;
-  GList *ll;
-  GList *flushers;
+  g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
+                               G_PRIORITY_DEFAULT,
+                               data->worker->cancellable,
+                               ostream_flush_cb,
+                               data);
+}
 
-  /* first log the fact that we wrote a message */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ */
+static void
+message_written_unlocked (GDBusWorker *worker,
+                          MessageToWriteData *message_data)
+{
   if (G_UNLIKELY (_g_dbus_debug_message ()))
     {
       gchar *s;
@@ -1211,10 +1406,24 @@ message_written (GDBusWorker *worker,
       _g_dbus_debug_print_unlock ();
     }
 
-  /* then first wake up pending flushes and, if needed, flush the stream */
-  flushers = NULL;
-  g_mutex_lock (&worker->write_lock);
   worker->write_num_messages_written += 1;
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ *
+ * Returns: non-%NULL, setting @output_pending, if we need to flush now
+ */
+static FlushAsyncData *
+prepare_flush_unlocked (GDBusWorker *worker)
+{
+  GList *l;
+  GList *ll;
+  GList *flushers;
+
+  flushers = NULL;
   for (l = worker->write_pending_flushes; l != NULL; l = ll)
     {
       FlushData *f = l->data;
@@ -1228,35 +1437,27 @@ message_written (GDBusWorker *worker,
     }
   if (flushers != NULL)
     {
-      g_assert (!worker->output_pending);
-      worker->output_pending = TRUE;
+      g_assert (worker->output_pending == PENDING_NONE);
+      worker->output_pending = PENDING_FLUSH;
     }
-  g_mutex_unlock (&worker->write_lock);
 
   if (flushers != NULL)
     {
       FlushAsyncData *data;
+
       data = g_new0 (FlushAsyncData, 1);
       data->worker = _g_dbus_worker_ref (worker);
       data->flushers = flushers;
-      /* flush the stream before writing the next message */
-      g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
-                                   G_PRIORITY_DEFAULT,
-                                   worker->cancellable,
-                                   ostream_flush_cb,
-                                   data);
-    }
-  else
-    {
-      /* kick off the next write! */
-      maybe_write_next_message (worker);
+      return data;
     }
+
+  return NULL;
 }
 
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
  */
 static void
 write_message_cb (GObject       *source_object,
@@ -1267,23 +1468,26 @@ write_message_cb (GObject       *source_object,
   GError *error;
 
   g_mutex_lock (&data->worker->write_lock);
-  g_assert (data->worker->output_pending);
-  data->worker->output_pending = FALSE;
-  g_mutex_unlock (&data->worker->write_lock);
+  g_assert (data->worker->output_pending == PENDING_WRITE);
+  data->worker->output_pending = PENDING_NONE;
 
   error = NULL;
   if (!write_message_finish (res, &error))
     {
+      g_mutex_unlock (&data->worker->write_lock);
+
       /* TODO: handle */
       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
       g_error_free (error);
+
+      g_mutex_lock (&data->worker->write_lock);
     }
 
-  /* this function will also kick of the next write (it might need to
-   * flush so writing the next message might happen much later
-   * e.g. async)
-   */
-  message_written (data->worker, data);
+  message_written_unlocked (data->worker, data);
+
+  g_mutex_unlock (&data->worker->write_lock);
+
+  continue_writing (data->worker);
 
   message_to_write_data_free (data);
 }
@@ -1291,7 +1495,7 @@ write_message_cb (GObject       *source_object,
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_CLOSE on entry
  */
 static void
 iostream_close_cb (GObject      *source_object,
@@ -1316,8 +1520,8 @@ iostream_close_cb (GObject      *source_object,
   send_queue = worker->write_queue;
   worker->write_queue = g_queue_new ();
 
-  g_assert (worker->output_pending);
-  worker->output_pending = FALSE;
+  g_assert (worker->output_pending == PENDING_CLOSE);
+  worker->output_pending = PENDING_NONE;
 
   g_mutex_unlock (&worker->write_lock);
 
@@ -1345,9 +1549,7 @@ iostream_close_cb (GObject      *source_object,
   g_clear_error (&error);
 
   /* all messages queued for sending are discarded */
-  g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
-  g_queue_free (send_queue);
-
+  g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
   /* all queued flushes fail */
   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
                        _("Operation was cancelled"));
@@ -1361,35 +1563,44 @@ iostream_close_cb (GObject      *source_object,
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending must be false on entry
+ * output_pending must be PENDING_NONE on entry
  */
 static void
-maybe_write_next_message (GDBusWorker *worker)
+continue_writing (GDBusWorker *worker)
 {
   MessageToWriteData *data;
+  FlushAsyncData *flush_async_data;
 
  write_next:
   /* we mustn't try to write two things at once */
-  g_assert (!worker->output_pending);
+  g_assert (worker->output_pending == PENDING_NONE);
 
   g_mutex_lock (&worker->write_lock);
 
+  data = NULL;
+  flush_async_data = NULL;
+
   /* if we want to close the connection, that takes precedence */
   if (worker->pending_close_attempts != NULL)
     {
-      worker->output_pending = TRUE;
+      worker->close_expected = TRUE;
+      worker->output_pending = PENDING_CLOSE;
 
       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
                                NULL, iostream_close_cb,
                                _g_dbus_worker_ref (worker));
-      data = NULL;
     }
   else
     {
-      data = g_queue_pop_head (worker->write_queue);
+      flush_async_data = prepare_flush_unlocked (worker);
 
-      if (data != NULL)
-        worker->output_pending = TRUE;
+      if (flush_async_data == NULL)
+        {
+          data = g_queue_pop_head (worker->write_queue);
+
+          if (data != NULL)
+            worker->output_pending = PENDING_WRITE;
+        }
     }
 
   g_mutex_unlock (&worker->write_lock);
@@ -1402,7 +1613,13 @@ maybe_write_next_message (GDBusWorker *worker)
    * code and then writing the message out onto the GIOStream since this
    * function only runs on the worker thread.
    */
-  if (data != NULL)
+
+  if (flush_async_data != NULL)
+    {
+      start_flush (flush_async_data);
+      g_assert (data == NULL);
+    }
+  else if (data != NULL)
     {
       GDBusMessage *old_message;
       guchar *new_blob;
@@ -1419,7 +1636,7 @@ maybe_write_next_message (GDBusWorker *worker)
         {
           /* filters dropped message */
           g_mutex_lock (&worker->write_lock);
-          worker->output_pending = FALSE;
+          worker->output_pending = PENDING_NONE;
           g_mutex_unlock (&worker->write_lock);
           message_to_write_data_free (data);
           goto write_next;
@@ -1460,59 +1677,68 @@ maybe_write_next_message (GDBusWorker *worker)
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
  */
 static gboolean
-write_message_in_idle_cb (gpointer user_data)
+continue_writing_in_idle_cb (gpointer user_data)
 {
   GDBusWorker *worker = user_data;
 
   /* Because this is the worker thread, we can read this struct member
    * without holding the lock: no other thread ever modifies it.
    */
-  if (!worker->output_pending)
-    maybe_write_next_message (worker);
+  if (worker->output_pending == PENDING_NONE)
+    continue_writing (worker);
 
   return FALSE;
 }
 
-/*
+/**
  * @write_data: (transfer full) (allow-none):
+ * @flush_data: (transfer full) (allow-none):
  * @close_data: (transfer full) (allow-none):
  *
  * Can be called from any thread
  *
- * write_lock is not held on entry
- * output_pending may be true or false
+ * write_lock is held on entry
+ * output_pending may be anything
  */
 static void
-schedule_write_in_worker_thread (GDBusWorker        *worker,
-                                 MessageToWriteData *write_data,
-                                 CloseData          *close_data)
+schedule_writing_unlocked (GDBusWorker        *worker,
+                           MessageToWriteData *write_data,
+                           FlushData          *flush_data,
+                           CloseData          *close_data)
 {
-  g_mutex_lock (&worker->write_lock);
-
   if (write_data != NULL)
     g_queue_push_tail (worker->write_queue, write_data);
 
+  if (flush_data != NULL)
+    worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
+
   if (close_data != NULL)
     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
                                                      close_data);
 
-  if (!worker->output_pending)
+  /* If we had output pending, the next bit of output will happen
+   * automatically when it finishes, so we only need to do this
+   * if nothing was pending.
+   *
+   * The idle callback will re-check that output_pending is still
+   * PENDING_NONE, to guard against output starting before the idle.
+   */
+  if (worker->output_pending == PENDING_NONE)
     {
       GSource *idle_source;
       idle_source = g_idle_source_new ();
       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
       g_source_set_callback (idle_source,
-                             write_message_in_idle_cb,
+                             continue_writing_in_idle_cb,
                              _g_dbus_worker_ref (worker),
                              (GDestroyNotify) _g_dbus_worker_unref);
+      g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
       g_source_attach (idle_source, worker->shared_thread_data->context);
       g_source_unref (idle_source);
     }
-
-  g_mutex_unlock (&worker->write_lock);
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
@@ -1520,7 +1746,7 @@ schedule_write_in_worker_thread (GDBusWorker        *worker,
 /* can be called from any thread - steals blob
  *
  * write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
  */
 void
 _g_dbus_worker_send_message (GDBusWorker    *worker,
@@ -1540,7 +1766,9 @@ _g_dbus_worker_send_message (GDBusWorker    *worker,
   data->blob = blob; /* steal! */
   data->blob_size = blob_len;
 
-  schedule_write_in_worker_thread (worker, data, NULL);
+  g_mutex_lock (&worker->write_lock);
+  schedule_writing_unlocked (worker, data, NULL, NULL);
+  g_mutex_unlock (&worker->write_lock);
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
@@ -1573,7 +1801,7 @@ _g_dbus_worker_new (GIOStream                              *stream,
   worker->stream = g_object_ref (stream);
   worker->capabilities = capabilities;
   worker->cancellable = g_cancellable_new ();
-  worker->output_pending = FALSE;
+  worker->output_pending = PENDING_NONE;
 
   worker->frozen = initially_frozen;
   worker->received_messages_while_frozen = g_queue_new ();
@@ -1584,6 +1812,11 @@ _g_dbus_worker_new (GIOStream                              *stream,
   if (G_IS_SOCKET_CONNECTION (worker->stream))
     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
 
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  if (G_IS_KDBUS_CONNECTION (worker->stream))
+    worker->kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream));
+#endif
+
   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
 
   /* begin reading */
@@ -1593,6 +1826,7 @@ _g_dbus_worker_new (GIOStream                              *stream,
                          _g_dbus_worker_do_initial_read,
                          _g_dbus_worker_ref (worker),
                          (GDestroyNotify) _g_dbus_worker_unref);
+  g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
   g_source_attach (idle_source, worker->shared_thread_data->context);
   g_source_unref (idle_source);
 
@@ -1604,7 +1838,7 @@ _g_dbus_worker_new (GIOStream                              *stream,
 /* can be called from any thread
  *
  * write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
  */
 void
 _g_dbus_worker_close (GDBusWorker         *worker,
@@ -1619,8 +1853,13 @@ _g_dbus_worker_close (GDBusWorker         *worker,
       (cancellable == NULL ? NULL : g_object_ref (cancellable));
   close_data->result = (result == NULL ? NULL : g_object_ref (result));
 
+  /* Don't set worker->close_expected here - we're in the wrong thread.
+   * It'll be set before the actual close happens.
+   */
   g_cancellable_cancel (worker->cancellable);
-  schedule_write_in_worker_thread (worker, NULL, close_data);
+  g_mutex_lock (&worker->write_lock);
+  schedule_writing_unlocked (worker, NULL, NULL, close_data);
+  g_mutex_unlock (&worker->write_lock);
 }
 
 /* This can be called from any thread - frees worker. Note that
@@ -1628,7 +1867,7 @@ _g_dbus_worker_close (GDBusWorker         *worker,
  * worker - use your own synchronization primitive in the callbacks.
  *
  * write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
  */
 void
 _g_dbus_worker_stop (GDBusWorker *worker)
@@ -1640,7 +1879,7 @@ _g_dbus_worker_stop (GDBusWorker *worker)
    */
   _g_dbus_worker_close (worker, NULL, NULL);
 
-  /* _g_dbus_worker_close holds a ref until after an idle in the the worker
+  /* _g_dbus_worker_close holds a ref until after an idle in the worker
    * thread has run, so we no longer need to unref in an idle like in
    * commit 322e25b535
    */
@@ -1654,7 +1893,7 @@ _g_dbus_worker_stop (GDBusWorker *worker)
  * the transport has been flushed
  *
  * write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
  */
 gboolean
 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
@@ -1663,20 +1902,34 @@ _g_dbus_worker_flush_sync (GDBusWorker    *worker,
 {
   gboolean ret;
   FlushData *data;
+  guint64 pending_writes;
 
   data = NULL;
   ret = TRUE;
 
-  /* if the queue is empty, there's nothing to wait for */
   g_mutex_lock (&worker->write_lock);
-  if (g_queue_get_length (worker->write_queue) > 0)
+
+  /* if the queue is empty, no write is in-flight and we haven't written
+   * anything since the last flush, then there's nothing to wait for
+   */
+  pending_writes = g_queue_get_length (worker->write_queue);
+
+  /* if a write is in-flight, we shouldn't be satisfied until the first
+   * flush operation that follows it
+   */
+  if (worker->output_pending == PENDING_WRITE)
+    pending_writes += 1;
+
+  if (pending_writes > 0 ||
+      worker->write_num_messages_written != worker->write_num_messages_flushed)
     {
       data = g_new0 (FlushData, 1);
       g_mutex_init (&data->mutex);
       g_cond_init (&data->cond);
-      data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
+      data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
       g_mutex_lock (&data->mutex);
-      worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
+
+      schedule_writing_unlocked (worker, NULL, data, NULL);
     }
   g_mutex_unlock (&worker->write_lock);
 
@@ -1687,7 +1940,7 @@ _g_dbus_worker_flush_sync (GDBusWorker    *worker,
 
       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
       g_cond_clear (&data->cond);
-      g_mutex_free (&data->mutex);
+      g_mutex_clear (&data->mutex);
       if (data->error != NULL)
         {
           ret = FALSE;
@@ -1798,7 +2051,7 @@ _g_dbus_debug_print_unlock (void)
   G_UNLOCK (print_lock);
 }
 
-/*
+/**
  * _g_dbus_initialize:
  *
  * Does various one-time init things such as
@@ -1943,22 +2196,73 @@ out:
 gchar *
 _g_dbus_get_machine_id (GError **error)
 {
+#ifdef G_OS_WIN32
+  HW_PROFILE_INFOA info;
+  char *src, *dest, *res;
+  int i;
+
+  if (!GetCurrentHwProfileA (&info))
+    {
+      char *message = g_win32_error_message (GetLastError ());
+      g_set_error (error,
+                  G_IO_ERROR,
+                  G_IO_ERROR_FAILED,
+                  _("Unable to get Hardware profile: %s"), message);
+      g_free (message);
+      return NULL;
+    }
+
+  /* Form: {12340001-4980-1920-6788-123456789012} */
+  src = &info.szHwProfileGuid[0];
+
+  res = g_malloc (32+1);
+  dest = res;
+
+  src++; /* Skip { */
+  for (i = 0; i < 8; i++)
+    *dest++ = *src++;
+  src++; /* Skip - */
+  for (i = 0; i < 4; i++)
+    *dest++ = *src++;
+  src++; /* Skip - */
+  for (i = 0; i < 4; i++)
+    *dest++ = *src++;
+  src++; /* Skip - */
+  for (i = 0; i < 4; i++)
+    *dest++ = *src++;
+  src++; /* Skip - */
+  for (i = 0; i < 12; i++)
+    *dest++ = *src++;
+  *dest = 0;
+
+  return res;
+#else
   gchar *ret;
+  GError *first_error;
   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
   ret = NULL;
+  first_error = NULL;
   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
                             &ret,
                             NULL,
-                            error))
+                            &first_error) &&
+      !g_file_get_contents ("/etc/machine-id",
+                            &ret,
+                            NULL,
+                            NULL))
     {
-      g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
+      g_propagate_prefixed_error (error, first_error,
+                                  _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
     }
   else
     {
+      /* ignore the error from the first try, if any */
+      g_clear_error (&first_error);
       /* TODO: validate value */
       g_strstrip (ret);
     }
   return ret;
+#endif
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
@@ -1992,7 +2296,7 @@ write_message_print_transport_debug (gssize bytes_written,
   _g_dbus_debug_print_lock ();
   g_print ("========================================================================\n"
            "GDBus-debug:Transport:\n"
-           "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
+           "  >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
            bytes_written,
            g_dbus_message_get_serial (data->message),
@@ -2043,7 +2347,7 @@ read_message_print_transport_debug (gssize bytes_read,
     _g_dbus_debug_print_lock ();
   g_print ("========================================================================\n"
            "GDBus-debug:Transport:\n"
-           "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
+           "  <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
            bytes_read,
            serial,