schedule_write_in_worker_thread: require caller to lock; rename accordingly
[platform/upstream/glib.git] / gio / gdbusprivate.c
index aa2a16e..6257b35 100644 (file)
@@ -287,7 +287,6 @@ static SharedThreadData *
 _g_dbus_shared_thread_ref (void)
 {
   static gsize shared_thread_data = 0;
-  GError *error = NULL;
   SharedThreadData *ret;
 
   if (g_once_init_enter (&shared_thread_data))
@@ -302,11 +301,9 @@ _g_dbus_shared_thread_ref (void)
       
       data->context = g_main_context_new ();
       data->loop = g_main_loop_new (data->context, FALSE);
-      data->thread = g_thread_create (gdbus_shared_thread_func,
-                                     data,
-                                     TRUE,
-                                     &error);
-      g_assert_no_error (error);
+      data->thread = g_thread_new ("gdbus",
+                                   gdbus_shared_thread_func,
+                                   data);
       /* We can cast between gsize and gpointer safely */
       g_once_init_leave (&shared_thread_data, (gsize) data);
     }
@@ -334,13 +331,21 @@ _g_dbus_shared_thread_unref (SharedThreadData *data)
 
 /* ---------------------------------------------------------------------------------------------------- */
 
+typedef enum {
+    PENDING_NONE = 0,
+    PENDING_WRITE,
+    PENDING_FLUSH,
+    PENDING_CLOSE
+} OutputPending;
+
 struct GDBusWorker
 {
   volatile gint                       ref_count;
 
   SharedThreadData                   *shared_thread_data;
 
-  gboolean                            stopped;
+  /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
+  volatile gint                       stopped;
 
   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
    * only affects messages received from the other peer (since GDBusServer is the
@@ -361,7 +366,7 @@ struct GDBusWorker
   GSocket *socket;
 
   /* used for reading */
-  GMutex                             *read_lock;
+  GMutex                              read_lock;
   gchar                              *read_buffer;
   gsize                               read_buffer_allocated_size;
   gsize                               read_buffer_cur_size;
@@ -370,21 +375,34 @@ struct GDBusWorker
   GSocketControlMessage             **read_ancillary_messages;
   gint                                read_num_ancillary_messages;
 
+  /* Whether an async write, flush or close, or none of those, is pending.
+   * Only the worker thread may change its value, and only with the write_lock.
+   * Other threads may read its value when holding the write_lock.
+   * The worker thread may read its value at any time.
+   */
+  OutputPending                       output_pending;
   /* used for writing */
-  gint                                num_writes_pending;
-  GMutex                             *write_lock;
+  GMutex                              write_lock;
+  /* queue of MessageToWriteData, protected by write_lock */
   GQueue                             *write_queue;
+  /* protected by write_lock */
   guint64                             write_num_messages_written;
+  /* list of FlushData, protected by write_lock */
   GList                              *write_pending_flushes;
-  gboolean                            flush_pending;
+  /* list of CloseData, protected by write_lock */
+  GList                              *pending_close_attempts;
+  /* no lock - only used from the worker thread */
+  gboolean                            close_expected;
 };
 
+static void _g_dbus_worker_unref (GDBusWorker *worker);
+
 /* ---------------------------------------------------------------------------------------------------- */
 
 typedef struct
 {
-  GMutex *mutex;
-  GCond *cond;
+  GMutex  mutex;
+  GCond   cond;
   guint64 number_to_wait_for;
   GError *error;
 } FlushData;
@@ -400,6 +418,24 @@ static void read_message_print_transport_debug (gssize bytes_read,
 static void write_message_print_transport_debug (gssize bytes_written,
                                                  MessageToWriteData *data);
 
+typedef struct {
+    GDBusWorker *worker;
+    GCancellable *cancellable;
+    GSimpleAsyncResult *result;
+} CloseData;
+
+static void close_data_free (CloseData *close_data)
+{
+  if (close_data->cancellable != NULL)
+    g_object_unref (close_data->cancellable);
+
+  if (close_data->result != NULL)
+    g_object_unref (close_data->result);
+
+  _g_dbus_worker_unref (close_data->worker);
+  g_slice_free (CloseData, close_data);
+}
+
 /* ---------------------------------------------------------------------------------------------------- */
 
 static GDBusWorker *
@@ -420,7 +456,7 @@ _g_dbus_worker_unref (GDBusWorker *worker)
 
       g_object_unref (worker->stream);
 
-      g_mutex_free (worker->read_lock);
+      g_mutex_clear (&worker->read_lock);
       g_object_unref (worker->cancellable);
       if (worker->read_fd_list != NULL)
         g_object_unref (worker->read_fd_list);
@@ -428,7 +464,7 @@ _g_dbus_worker_unref (GDBusWorker *worker)
       g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
       g_queue_free (worker->received_messages_while_frozen);
 
-      g_mutex_free (worker->write_lock);
+      g_mutex_clear (&worker->write_lock);
       g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
       g_queue_free (worker->write_queue);
 
@@ -443,7 +479,7 @@ _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
                                   gboolean      remote_peer_vanished,
                                   GError       *error)
 {
-  if (!worker->stopped)
+  if (!g_atomic_int_get (&worker->stopped))
     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
 }
 
@@ -451,7 +487,7 @@ static void
 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
                                       GDBusMessage *message)
 {
-  if (!worker->stopped)
+  if (!g_atomic_int_get (&worker->stopped))
     worker->message_received_callback (worker, message, worker->user_data);
 }
 
@@ -460,7 +496,7 @@ _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
                                               GDBusMessage *message)
 {
   GDBusMessage *ret;
-  if (!worker->stopped)
+  if (!g_atomic_int_get (&worker->stopped))
     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
   else
     ret = message;
@@ -492,7 +528,7 @@ unfreeze_in_idle_cb (gpointer user_data)
   GDBusWorker *worker = user_data;
   GDBusMessage *message;
 
-  g_mutex_lock (worker->read_lock);
+  g_mutex_lock (&worker->read_lock);
   if (worker->frozen)
     {
       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
@@ -506,7 +542,7 @@ unfreeze_in_idle_cb (gpointer user_data)
     {
       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
     }
-  g_mutex_unlock (worker->read_lock);
+  g_mutex_unlock (&worker->read_lock);
   return FALSE;
 }
 
@@ -539,10 +575,10 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
   GError *error;
   gssize bytes_read;
 
-  g_mutex_lock (worker->read_lock);
+  g_mutex_lock (&worker->read_lock);
 
   /* If already stopped, don't even process the reply */
-  if (worker->stopped)
+  if (g_atomic_int_get (&worker->stopped))
     goto out;
 
   error = NULL;
@@ -620,7 +656,40 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
 
   if (bytes_read == -1)
     {
-      _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+      if (G_UNLIKELY (_g_dbus_debug_transport ()))
+        {
+          _g_dbus_debug_print_lock ();
+          g_print ("========================================================================\n"
+                   "GDBus-debug:Transport:\n"
+                   "  ---- READ ERROR on stream of type %s:\n"
+                   "  ---- %s %d: %s\n",
+                   g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
+                   g_quark_to_string (error->domain), error->code,
+                   error->message);
+          _g_dbus_debug_print_unlock ();
+        }
+
+      /* Every async read that uses this callback uses worker->cancellable
+       * as its GCancellable. worker->cancellable gets cancelled if and only
+       * if the GDBusConnection tells us to close (either via
+       * _g_dbus_worker_stop, which is called on last-unref, or directly),
+       * so a cancelled read must mean our connection was closed locally.
+       *
+       * If we're closing, other errors are possible - notably,
+       * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
+       * read in-flight. It seems sensible to treat all read errors during
+       * closing as an expected thing that doesn't trip exit-on-close.
+       *
+       * Because close_expected can't be set until we get into the worker
+       * thread, but the cancellable is signalled sooner (from another
+       * thread), we do still need to check the error.
+       */
+      if (worker->close_expected ||
+          g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+        _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
+      else
+        _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+
       g_error_free (error);
       goto out;
     }
@@ -746,7 +815,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
     }
 
  out:
-  g_mutex_unlock (worker->read_lock);
+  g_mutex_unlock (&worker->read_lock);
 
   /* gives up the reference acquired when calling g_input_stream_read_async() */
   _g_dbus_worker_unref (worker);
@@ -756,6 +825,10 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
 static void
 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
 {
+  /* Note that we do need to keep trying to read even if close_expected is
+   * true, because only failing a read causes us to signal 'closed'.
+   */
+
   /* if bytes_wanted is zero, it means start reading a message */
   if (worker->read_buffer_bytes_wanted == 0)
     {
@@ -800,9 +873,9 @@ static gboolean
 _g_dbus_worker_do_initial_read (gpointer data)
 {
   GDBusWorker *worker = data;
-  g_mutex_lock (worker->read_lock);
+  g_mutex_lock (&worker->read_lock);
   _g_dbus_worker_do_read_unlocked (worker);
-  g_mutex_unlock (worker->read_lock);
+  g_mutex_unlock (&worker->read_lock);
   return FALSE;
 }
 
@@ -834,7 +907,11 @@ message_to_write_data_free (MessageToWriteData *data)
 
 static void write_message_continue_writing (MessageToWriteData *data);
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_WRITE on entry
+ */
 static void
 write_message_async_cb (GObject      *source_object,
                         GAsyncResult *res,
@@ -880,7 +957,11 @@ write_message_async_cb (GObject      *source_object,
   ;
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_WRITE on entry
+ */
 static gboolean
 on_socket_ready (GSocket      *socket,
                  GIOCondition  condition,
@@ -891,7 +972,11 @@ on_socket_ready (GSocket      *socket,
   return FALSE; /* remove source */
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_WRITE on entry
+ */
 static void
 write_message_continue_writing (MessageToWriteData *data)
 {
@@ -1024,7 +1109,11 @@ write_message_continue_writing (MessageToWriteData *data)
   ;
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_WRITE on entry
+ */
 static void
 write_message_async (GDBusWorker         *worker,
                      MessageToWriteData  *data,
@@ -1052,7 +1141,7 @@ write_message_finish (GAsyncResult   *res,
 }
 /* ---------------------------------------------------------------------------------------------------- */
 
-static void maybe_write_next_message (GDBusWorker *worker);
+static void continue_writing (GDBusWorker *worker);
 
 typedef struct
 {
@@ -1060,7 +1149,29 @@ typedef struct
   GList *flushers;
 } FlushAsyncData;
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static void
+flush_data_list_complete (const GList  *flushers,
+                          const GError *error)
+{
+  const GList *l;
+
+  for (l = flushers; l != NULL; l = l->next)
+    {
+      FlushData *f = l->data;
+
+      f->error = error != NULL ? g_error_copy (error) : NULL;
+
+      g_mutex_lock (&f->mutex);
+      g_cond_signal (&f->cond);
+      g_mutex_unlock (&f->mutex);
+    }
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_FLUSH on entry
+ */
 static void
 ostream_flush_cb (GObject      *source_object,
                   GAsyncResult *res,
@@ -1068,7 +1179,6 @@ ostream_flush_cb (GObject      *source_object,
 {
   FlushAsyncData *data = user_data;
   GError *error;
-  GList *l;
 
   error = NULL;
   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
@@ -1089,16 +1199,7 @@ ostream_flush_cb (GObject      *source_object,
     }
 
   g_assert (data->flushers != NULL);
-  for (l = data->flushers; l != NULL; l = l->next)
-    {
-      FlushData *f = l->data;
-
-      f->error = error != NULL ? g_error_copy (error) : NULL;
-
-      g_mutex_lock (f->mutex);
-      g_cond_signal (f->cond);
-      g_mutex_unlock (f->mutex);
-    }
+  flush_data_list_complete (data->flushers, error);
   g_list_free (data->flushers);
 
   if (error != NULL)
@@ -1106,18 +1207,23 @@ ostream_flush_cb (GObject      *source_object,
 
   /* Make sure we tell folks that we don't have additional
      flushes pending */
-  g_mutex_lock (data->worker->write_lock);
-  data->worker->flush_pending = FALSE;
-  g_mutex_unlock (data->worker->write_lock);
+  g_mutex_lock (&data->worker->write_lock);
+  g_assert (data->worker->output_pending == PENDING_FLUSH);
+  data->worker->output_pending = PENDING_NONE;
+  g_mutex_unlock (&data->worker->write_lock);
 
   /* OK, cool, finally kick off the next write */
-  maybe_write_next_message (data->worker);
+  continue_writing (data->worker);
 
   _g_dbus_worker_unref (data->worker);
   g_free (data);
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_NONE on entry
+ */
 static void
 message_written (GDBusWorker *worker,
                  MessageToWriteData *message_data)
@@ -1149,7 +1255,7 @@ message_written (GDBusWorker *worker,
 
   /* then first wake up pending flushes and, if needed, flush the stream */
   flushers = NULL;
-  g_mutex_lock (worker->write_lock);
+  g_mutex_lock (&worker->write_lock);
   worker->write_num_messages_written += 1;
   for (l = worker->write_pending_flushes; l != NULL; l = ll)
     {
@@ -1164,9 +1270,10 @@ message_written (GDBusWorker *worker,
     }
   if (flushers != NULL)
     {
-      worker->flush_pending = TRUE;
+      g_assert (worker->output_pending == PENDING_NONE);
+      worker->output_pending = PENDING_FLUSH;
     }
-  g_mutex_unlock (worker->write_lock);
+  g_mutex_unlock (&worker->write_lock);
 
   if (flushers != NULL)
     {
@@ -1184,11 +1291,15 @@ message_written (GDBusWorker *worker,
   else
     {
       /* kick off the next write! */
-      maybe_write_next_message (worker);
+      continue_writing (worker);
     }
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_WRITE on entry
+ */
 static void
 write_message_cb (GObject       *source_object,
                   GAsyncResult  *res,
@@ -1197,9 +1308,10 @@ write_message_cb (GObject       *source_object,
   MessageToWriteData *data = user_data;
   GError *error;
 
-  g_mutex_lock (data->worker->write_lock);
-  data->worker->num_writes_pending -= 1;
-  g_mutex_unlock (data->worker->write_lock);
+  g_mutex_lock (&data->worker->write_lock);
+  g_assert (data->worker->output_pending == PENDING_WRITE);
+  data->worker->output_pending = PENDING_NONE;
+  g_mutex_unlock (&data->worker->write_lock);
 
   error = NULL;
   if (!write_message_finish (res, &error))
@@ -1218,22 +1330,115 @@ write_message_cb (GObject       *source_object,
   message_to_write_data_free (data);
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is PENDING_CLOSE on entry
+ */
 static void
-maybe_write_next_message (GDBusWorker *worker)
+iostream_close_cb (GObject      *source_object,
+                   GAsyncResult *res,
+                   gpointer      user_data)
+{
+  GDBusWorker *worker = user_data;
+  GError *error = NULL;
+  GList *pending_close_attempts, *pending_flush_attempts;
+  GQueue *send_queue;
+
+  g_io_stream_close_finish (worker->stream, res, &error);
+
+  g_mutex_lock (&worker->write_lock);
+
+  pending_close_attempts = worker->pending_close_attempts;
+  worker->pending_close_attempts = NULL;
+
+  pending_flush_attempts = worker->write_pending_flushes;
+  worker->write_pending_flushes = NULL;
+
+  send_queue = worker->write_queue;
+  worker->write_queue = g_queue_new ();
+
+  g_assert (worker->output_pending == PENDING_CLOSE);
+  worker->output_pending = PENDING_NONE;
+
+  g_mutex_unlock (&worker->write_lock);
+
+  while (pending_close_attempts != NULL)
+    {
+      CloseData *close_data = pending_close_attempts->data;
+
+      pending_close_attempts = g_list_delete_link (pending_close_attempts,
+                                                   pending_close_attempts);
+
+      if (close_data->result != NULL)
+        {
+          if (error != NULL)
+            g_simple_async_result_set_from_error (close_data->result, error);
+
+          /* this must be in an idle because the result is likely to be
+           * intended for another thread
+           */
+          g_simple_async_result_complete_in_idle (close_data->result);
+        }
+
+      close_data_free (close_data);
+    }
+
+  g_clear_error (&error);
+
+  /* all messages queued for sending are discarded */
+  g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
+  g_queue_free (send_queue);
+
+  /* all queued flushes fail */
+  error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
+                       _("Operation was cancelled"));
+  flush_data_list_complete (pending_flush_attempts, error);
+  g_list_free (pending_flush_attempts);
+  g_clear_error (&error);
+
+  _g_dbus_worker_unref (worker);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending must be PENDING_NONE on entry
+ */
+static void
+continue_writing (GDBusWorker *worker)
 {
   MessageToWriteData *data;
 
  write_next:
+  /* we mustn't try to write two things at once */
+  g_assert (worker->output_pending == PENDING_NONE);
 
-  g_mutex_lock (worker->write_lock);
-  data = g_queue_pop_head (worker->write_queue);
-  if (data != NULL)
-    worker->num_writes_pending += 1;
-  g_mutex_unlock (worker->write_lock);
+  g_mutex_lock (&worker->write_lock);
+
+  /* if we want to close the connection, that takes precedence */
+  if (worker->pending_close_attempts != NULL)
+    {
+      worker->close_expected = TRUE;
+      worker->output_pending = PENDING_CLOSE;
+
+      g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
+                               NULL, iostream_close_cb,
+                               _g_dbus_worker_ref (worker));
+      data = NULL;
+    }
+  else
+    {
+      data = g_queue_pop_head (worker->write_queue);
+
+      if (data != NULL)
+        worker->output_pending = PENDING_WRITE;
+    }
+
+  g_mutex_unlock (&worker->write_lock);
 
   /* Note that write_lock is only used for protecting the @write_queue
-   * and @num_writes_pending fields of the GDBusWorker struct ... which we
+   * and @output_pending fields of the GDBusWorker struct ... which we
    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
    *
    * Therefore, it's fine to drop it here when calling back into user
@@ -1256,9 +1461,9 @@ maybe_write_next_message (GDBusWorker *worker)
       else if (data->message == NULL)
         {
           /* filters dropped message */
-          g_mutex_lock (worker->write_lock);
-          worker->num_writes_pending -= 1;
-          g_mutex_unlock (worker->write_lock);
+          g_mutex_lock (&worker->write_lock);
+          worker->output_pending = PENDING_NONE;
+          g_mutex_unlock (&worker->write_lock);
           message_to_write_data_free (data);
           goto write_next;
         }
@@ -1295,19 +1500,67 @@ maybe_write_next_message (GDBusWorker *worker)
     }
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending may be anything
+ */
 static gboolean
-write_message_in_idle_cb (gpointer user_data)
+continue_writing_in_idle_cb (gpointer user_data)
 {
   GDBusWorker *worker = user_data;
-  if (worker->num_writes_pending == 0 && !worker->flush_pending)
-    maybe_write_next_message (worker);
+
+  /* Because this is the worker thread, we can read this struct member
+   * without holding the lock: no other thread ever modifies it.
+   */
+  if (worker->output_pending == PENDING_NONE)
+    continue_writing (worker);
+
   return FALSE;
 }
 
+/*
+ * @write_data: (transfer full) (allow-none):
+ * @close_data: (transfer full) (allow-none):
+ *
+ * Can be called from any thread
+ *
+ * write_lock is held on entry
+ * output_pending may be anything
+ */
+static void
+schedule_writing_unlocked (GDBusWorker        *worker,
+                           MessageToWriteData *write_data,
+                           CloseData          *close_data)
+{
+  if (write_data != NULL)
+    g_queue_push_tail (worker->write_queue, write_data);
+
+  if (close_data != NULL)
+    worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
+                                                     close_data);
+
+  if (worker->output_pending == PENDING_NONE)
+    {
+      GSource *idle_source;
+      idle_source = g_idle_source_new ();
+      g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+      g_source_set_callback (idle_source,
+                             continue_writing_in_idle_cb,
+                             _g_dbus_worker_ref (worker),
+                             (GDestroyNotify) _g_dbus_worker_unref);
+      g_source_attach (idle_source, worker->shared_thread_data->context);
+      g_source_unref (idle_source);
+    }
+}
+
 /* ---------------------------------------------------------------------------------------------------- */
 
-/* can be called from any thread - steals blob */
+/* can be called from any thread - steals blob
+ *
+ * write_lock is not held on entry
+ * output_pending may be anything
+ */
 void
 _g_dbus_worker_send_message (GDBusWorker    *worker,
                              GDBusMessage   *message,
@@ -1326,21 +1579,9 @@ _g_dbus_worker_send_message (GDBusWorker    *worker,
   data->blob = blob; /* steal! */
   data->blob_size = blob_len;
 
-  g_mutex_lock (worker->write_lock);
-  g_queue_push_tail (worker->write_queue, data);
-  if (worker->num_writes_pending == 0)
-    {
-      GSource *idle_source;
-      idle_source = g_idle_source_new ();
-      g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
-      g_source_set_callback (idle_source,
-                             write_message_in_idle_cb,
-                             _g_dbus_worker_ref (worker),
-                             (GDestroyNotify) _g_dbus_worker_unref);
-      g_source_attach (idle_source, worker->shared_thread_data->context);
-      g_source_unref (idle_source);
-    }
-  g_mutex_unlock (worker->write_lock);
+  g_mutex_lock (&worker->write_lock);
+  schedule_writing_unlocked (worker, data, NULL);
+  g_mutex_unlock (&worker->write_lock);
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
@@ -1365,7 +1606,7 @@ _g_dbus_worker_new (GIOStream                              *stream,
   worker = g_new0 (GDBusWorker, 1);
   worker->ref_count = 1;
 
-  worker->read_lock = g_mutex_new ();
+  g_mutex_init (&worker->read_lock);
   worker->message_received_callback = message_received_callback;
   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
   worker->disconnected_callback = disconnected_callback;
@@ -1373,12 +1614,12 @@ _g_dbus_worker_new (GIOStream                              *stream,
   worker->stream = g_object_ref (stream);
   worker->capabilities = capabilities;
   worker->cancellable = g_cancellable_new ();
-  worker->flush_pending = FALSE;
+  worker->output_pending = PENDING_NONE;
 
   worker->frozen = initially_frozen;
   worker->received_messages_while_frozen = g_queue_new ();
 
-  worker->write_lock = g_mutex_new ();
+  g_mutex_init (&worker->write_lock);
   worker->write_queue = g_queue_new ();
 
   if (G_IS_SOCKET_CONNECTION (worker->stream))
@@ -1391,8 +1632,8 @@ _g_dbus_worker_new (GIOStream                              *stream,
   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
   g_source_set_callback (idle_source,
                          _g_dbus_worker_do_initial_read,
-                         worker,
-                         NULL);
+                         _g_dbus_worker_ref (worker),
+                         (GDestroyNotify) _g_dbus_worker_unref);
   g_source_attach (idle_source, worker->shared_thread_data->context);
   g_source_unref (idle_source);
 
@@ -1401,35 +1642,55 @@ _g_dbus_worker_new (GIOStream                              *stream,
 
 /* ---------------------------------------------------------------------------------------------------- */
 
-/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
-static gboolean
-unref_in_idle_cb (gpointer user_data)
+/* can be called from any thread
+ *
+ * write_lock is not held on entry
+ * output_pending may be anything
+ */
+void
+_g_dbus_worker_close (GDBusWorker         *worker,
+                      GCancellable        *cancellable,
+                      GSimpleAsyncResult  *result)
 {
-  GDBusWorker *worker = user_data;
-  _g_dbus_worker_unref (worker);
-  return FALSE;
+  CloseData *close_data;
+
+  close_data = g_slice_new0 (CloseData);
+  close_data->worker = _g_dbus_worker_ref (worker);
+  close_data->cancellable =
+      (cancellable == NULL ? NULL : g_object_ref (cancellable));
+  close_data->result = (result == NULL ? NULL : g_object_ref (result));
+
+  /* Don't set worker->close_expected here - we're in the wrong thread.
+   * It'll be set before the actual close happens.
+   */
+  g_cancellable_cancel (worker->cancellable);
+  g_mutex_lock (&worker->write_lock);
+  schedule_writing_unlocked (worker, NULL, close_data);
+  g_mutex_unlock (&worker->write_lock);
 }
 
 /* This can be called from any thread - frees worker. Note that
  * callbacks might still happen if called from another thread than the
  * worker - use your own synchronization primitive in the callbacks.
+ *
+ * write_lock is not held on entry
+ * output_pending may be anything
  */
 void
 _g_dbus_worker_stop (GDBusWorker *worker)
 {
-  GSource *idle_source;
+  g_atomic_int_set (&worker->stopped, TRUE);
 
-  worker->stopped = TRUE;
-  g_cancellable_cancel (worker->cancellable);
+  /* Cancel any pending operations and schedule a close of the underlying I/O
+   * stream in the worker thread
+   */
+  _g_dbus_worker_close (worker, NULL, NULL);
 
-  idle_source = g_idle_source_new ();
-  g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
-  g_source_set_callback (idle_source,
-                         unref_in_idle_cb,
-                         _g_dbus_worker_ref (worker),
-                         (GDestroyNotify) _g_dbus_worker_unref);
-  g_source_attach (idle_source, worker->shared_thread_data->context);
-  g_source_unref (idle_source);
+  /* _g_dbus_worker_close holds a ref until after an idle in the the worker
+   * thread has run, so we no longer need to unref in an idle like in
+   * commit 322e25b535
+   */
+  _g_dbus_worker_unref (worker);
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
@@ -1437,6 +1698,9 @@ _g_dbus_worker_stop (GDBusWorker *worker)
 /* can be called from any thread (except the worker thread) - blocks
  * calling thread until all queued outgoing messages are written and
  * the transport has been flushed
+ *
+ * write_lock is not held on entry
+ * output_pending may be anything
  */
 gboolean
 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
@@ -1450,26 +1714,26 @@ _g_dbus_worker_flush_sync (GDBusWorker    *worker,
   ret = TRUE;
 
   /* if the queue is empty, there's nothing to wait for */
-  g_mutex_lock (worker->write_lock);
+  g_mutex_lock (&worker->write_lock);
   if (g_queue_get_length (worker->write_queue) > 0)
     {
       data = g_new0 (FlushData, 1);
-      data->mutex = g_mutex_new ();
-      data->cond = g_cond_new ();
+      g_mutex_init (&data->mutex);
+      g_cond_init (&data->cond);
       data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
-      g_mutex_lock (data->mutex);
+      g_mutex_lock (&data->mutex);
       worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
     }
-  g_mutex_unlock (worker->write_lock);
+  g_mutex_unlock (&worker->write_lock);
 
   if (data != NULL)
     {
-      g_cond_wait (data->cond, data->mutex);
-      g_mutex_unlock (data->mutex);
+      g_cond_wait (&data->cond, &data->mutex);
+      g_mutex_unlock (&data->mutex);
 
       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
-      g_cond_free (data->cond);
-      g_mutex_free (data->mutex);
+      g_cond_clear (&data->cond);
+      g_mutex_clear (&data->mutex);
       if (data->error != NULL)
         {
           ret = FALSE;
@@ -1726,17 +1990,26 @@ gchar *
 _g_dbus_get_machine_id (GError **error)
 {
   gchar *ret;
+  GError *first_error;
   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
   ret = NULL;
+  first_error = NULL;
   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
                             &ret,
                             NULL,
-                            error))
+                            &first_error) &&
+      !g_file_get_contents ("/etc/machine-id",
+                            &ret,
+                            NULL,
+                            NULL))
     {
-      g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
+      g_propagate_prefixed_error (error, first_error,
+                                  _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
     }
   else
     {
+      /* ignore the error from the first try, if any */
+      g_clear_error (&first_error);
       /* TODO: validate value */
       g_strstrip (ret);
     }