GDBusWorker: as a precaution, access 'stopped' boolean atomically
[platform/upstream/glib.git] / gio / gdbusprivate.c
index d4d7a0b..4b2243d 100644 (file)
@@ -37,6 +37,7 @@
 #include "gasyncresult.h"
 #include "gsimpleasyncresult.h"
 #include "ginputstream.h"
+#include "gmemoryinputstream.h"
 #include "giostream.h"
 #include "gsocketcontrolmessage.h"
 #include "gsocketconnection.h"
@@ -54,6 +55,8 @@
 
 #include "glibintl.h"
 
+static gboolean _g_dbus_worker_do_initial_read (gpointer data);
+
 /* ---------------------------------------------------------------------------------------------------- */
 
 gchar *
@@ -152,8 +155,7 @@ _g_socket_read_with_control_messages_ready (GSocket      *socket,
   else
     {
       g_assert (error != NULL);
-      g_simple_async_result_set_from_error (data->simple, error);
-      g_error_free (error);
+      g_simple_async_result_take_error (data->simple, error);
     }
 
   if (data->from_mainloop)
@@ -229,119 +231,103 @@ _g_socket_read_with_control_messages_finish (GSocket       *socket,
 
 /* ---------------------------------------------------------------------------------------------------- */
 
-G_LOCK_DEFINE_STATIC (shared_thread_lock);
+/* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
+
+static GPtrArray *ensured_classes = NULL;
+
+static void
+ensure_type (GType gtype)
+{
+  g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
+}
+
+static void
+release_required_types (void)
+{
+  g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
+  g_ptr_array_unref (ensured_classes);
+  ensured_classes = NULL;
+}
+
+static void
+ensure_required_types (void)
+{
+  g_assert (ensured_classes == NULL);
+  ensured_classes = g_ptr_array_new ();
+  ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
+  ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
+}
+/* ---------------------------------------------------------------------------------------------------- */
 
 typedef struct
 {
-  gint num_users;
+  volatile gint refcount;
   GThread *thread;
   GMainContext *context;
   GMainLoop *loop;
 } SharedThreadData;
 
-static SharedThreadData *shared_thread_data = NULL;
-
 static gpointer
-shared_thread_func (gpointer data)
+gdbus_shared_thread_func (gpointer user_data)
 {
-  g_main_context_push_thread_default (shared_thread_data->context);
-  g_main_loop_run (shared_thread_data->loop);
-  g_main_context_pop_thread_default (shared_thread_data->context);
-  return NULL;
-}
+  SharedThreadData *data = user_data;
 
-typedef void (*GDBusSharedThreadFunc) (gpointer user_data);
+  g_main_context_push_thread_default (data->context);
+  g_main_loop_run (data->loop);
+  g_main_context_pop_thread_default (data->context);
 
-typedef struct
-{
-  GDBusSharedThreadFunc func;
-  gpointer              user_data;
-  gboolean              done;
-} CallerData;
+  release_required_types ();
 
-static gboolean
-invoke_caller (gpointer user_data)
-{
-  CallerData *data = user_data;
-  data->func (data->user_data);
-  data->done = TRUE;
-  return FALSE;
+  return NULL;
 }
 
-static void
-_g_dbus_shared_thread_ref (GDBusSharedThreadFunc func,
-                           gpointer              user_data)
-{
-  GError *error;
-  GSource *idle_source;
-  CallerData *data;
+/* ---------------------------------------------------------------------------------------------------- */
 
-  G_LOCK (shared_thread_lock);
+static SharedThreadData *
+_g_dbus_shared_thread_ref (void)
+{
+  static gsize shared_thread_data = 0;
+  GError *error = NULL;
+  SharedThreadData *ret;
 
-  if (shared_thread_data != NULL)
+  if (g_once_init_enter (&shared_thread_data))
     {
-      shared_thread_data->num_users += 1;
-      goto have_thread;
+      SharedThreadData *data;
+
+      /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
+      ensure_required_types ();
+
+      data = g_new0 (SharedThreadData, 1);
+      data->refcount = 0;
+      
+      data->context = g_main_context_new ();
+      data->loop = g_main_loop_new (data->context, FALSE);
+      data->thread = g_thread_create (gdbus_shared_thread_func,
+                                     data,
+                                     TRUE,
+                                     &error);
+      g_assert_no_error (error);
+      /* We can cast between gsize and gpointer safely */
+      g_once_init_leave (&shared_thread_data, (gsize) data);
     }
 
-  shared_thread_data = g_new0 (SharedThreadData, 1);
-  shared_thread_data->num_users = 1;
-
-  error = NULL;
-  shared_thread_data->context = g_main_context_new ();
-  shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE);
-  shared_thread_data->thread = g_thread_create (shared_thread_func,
-                                                NULL,
-                                                TRUE,
-                                                &error);
-  g_assert_no_error (error);
-
- have_thread:
-
-  data = g_new0 (CallerData, 1);
-  data->func = func;
-  data->user_data = user_data;
-  data->done = FALSE;
-
-  idle_source = g_idle_source_new ();
-  g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
-  g_source_set_callback (idle_source,
-                         invoke_caller,
-                         data,
-                         NULL);
-  g_source_attach (idle_source, shared_thread_data->context);
-  g_source_unref (idle_source);
-
-  /* wait for the user code to run.. hmm.. probably use a condition variable instead */
-  while (!data->done)
-    g_thread_yield ();
-
-  g_free (data);
-
-  G_UNLOCK (shared_thread_lock);
+  ret = (SharedThreadData*) shared_thread_data;
+  g_atomic_int_inc (&ret->refcount);
+  return ret;
 }
 
 static void
-_g_dbus_shared_thread_unref (void)
+_g_dbus_shared_thread_unref (SharedThreadData *data)
 {
   /* TODO: actually destroy the shared thread here */
 #if 0
-  G_LOCK (shared_thread_lock);
-  g_assert (shared_thread_data != NULL);
-  shared_thread_data->num_users -= 1;
-  if (shared_thread_data->num_users == 0)
-    {
-      g_main_loop_quit (shared_thread_data->loop);
-      //g_thread_join (shared_thread_data->thread);
-      g_main_loop_unref (shared_thread_data->loop);
-      g_main_context_unref (shared_thread_data->context);
-      g_free (shared_thread_data);
-      shared_thread_data = NULL;
-      G_UNLOCK (shared_thread_lock);
-    }
-  else
+  g_assert (data != NULL);
+  if (g_atomic_int_dec_and_test (&data->refcount))
     {
-      G_UNLOCK (shared_thread_lock);
+      g_main_loop_quit (data->loop);
+      //g_thread_join (data->thread);
+      g_main_loop_unref (data->loop);
+      g_main_context_unref (data->context);
     }
 #endif
 }
@@ -352,25 +338,26 @@ struct GDBusWorker
 {
   volatile gint                       ref_count;
 
-  gboolean                            stopped;
+  SharedThreadData                   *shared_thread_data;
+
+  /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
+  volatile gint                       stopped;
 
   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
    * only affects messages received from the other peer (since GDBusServer is the
    * only user) - we might want it to affect messages sent to the other peer too?
    */
   gboolean                            frozen;
+  GDBusCapabilityFlags                capabilities;
   GQueue                             *received_messages_while_frozen;
 
   GIOStream                          *stream;
-  GDBusCapabilityFlags                capabilities;
   GCancellable                       *cancellable;
   GDBusWorkerMessageReceivedCallback  message_received_callback;
   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
   GDBusWorkerDisconnectedCallback     disconnected_callback;
   gpointer                            user_data;
 
-  GThread                            *thread;
-
   /* if not NULL, stream is GSocketConnection */
   GSocket *socket;
 
@@ -384,14 +371,26 @@ struct GDBusWorker
   GSocketControlMessage             **read_ancillary_messages;
   gint                                read_num_ancillary_messages;
 
+  /* TRUE if an async write, flush or close is pending.
+   * Only the worker thread may change its value, and only with the write_lock.
+   * Other threads may read its value when holding the write_lock.
+   * The worker thread may read its value at any time.
+   */
+  gboolean                            output_pending;
   /* used for writing */
   GMutex                             *write_lock;
+  /* queue of MessageToWriteData, protected by write_lock */
   GQueue                             *write_queue;
-  gint                                num_writes_pending;
+  /* protected by write_lock */
   guint64                             write_num_messages_written;
+  /* list of FlushData, protected by write_lock */
   GList                              *write_pending_flushes;
+  /* list of CloseData, protected by write_lock */
+  GList                              *pending_close_attempts;
 };
 
+static void _g_dbus_worker_unref (GDBusWorker *worker);
+
 /* ---------------------------------------------------------------------------------------------------- */
 
 typedef struct
@@ -413,6 +412,24 @@ static void read_message_print_transport_debug (gssize bytes_read,
 static void write_message_print_transport_debug (gssize bytes_written,
                                                  MessageToWriteData *data);
 
+typedef struct {
+    GDBusWorker *worker;
+    GCancellable *cancellable;
+    GSimpleAsyncResult *result;
+} CloseData;
+
+static void close_data_free (CloseData *close_data)
+{
+  if (close_data->cancellable != NULL)
+    g_object_unref (close_data->cancellable);
+
+  if (close_data->result != NULL)
+    g_object_unref (close_data->result);
+
+  _g_dbus_worker_unref (close_data->worker);
+  g_slice_free (CloseData, close_data);
+}
+
 /* ---------------------------------------------------------------------------------------------------- */
 
 static GDBusWorker *
@@ -429,7 +446,7 @@ _g_dbus_worker_unref (GDBusWorker *worker)
     {
       g_assert (worker->write_pending_flushes == NULL);
 
-      _g_dbus_shared_thread_unref ();
+      _g_dbus_shared_thread_unref (worker->shared_thread_data);
 
       g_object_unref (worker->stream);
 
@@ -456,7 +473,7 @@ _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
                                   gboolean      remote_peer_vanished,
                                   GError       *error)
 {
-  if (!worker->stopped)
+  if (!g_atomic_int_get (&worker->stopped))
     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
 }
 
@@ -464,18 +481,19 @@ static void
 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
                                       GDBusMessage *message)
 {
-  if (!worker->stopped)
+  if (!g_atomic_int_get (&worker->stopped))
     worker->message_received_callback (worker, message, worker->user_data);
 }
 
-static gboolean
+static GDBusMessage *
 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
                                               GDBusMessage *message)
 {
-  gboolean ret;
-  ret = FALSE;
-  if (!worker->stopped)
+  GDBusMessage *ret;
+  if (!g_atomic_int_get (&worker->stopped))
     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
+  else
+    ret = message;
   return ret;
 }
 
@@ -533,7 +551,7 @@ _g_dbus_worker_unfreeze (GDBusWorker *worker)
                          unfreeze_in_idle_cb,
                          _g_dbus_worker_ref (worker),
                          (GDestroyNotify) _g_dbus_worker_unref);
-  g_source_attach (idle_source, shared_thread_data->context);
+  g_source_attach (idle_source, worker->shared_thread_data->context);
   g_source_unref (idle_source);
 }
 
@@ -554,7 +572,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
   g_mutex_lock (worker->read_lock);
 
   /* If already stopped, don't even process the reply */
-  if (worker->stopped)
+  if (g_atomic_int_get (&worker->stopped))
     goto out;
 
   error = NULL;
@@ -717,6 +735,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
           if (worker->read_fd_list != NULL)
             {
               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
+              g_object_unref (worker->read_fd_list);
               worker->read_fd_list = NULL;
             }
 #endif
@@ -807,12 +826,14 @@ _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
 }
 
 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
-static void
-_g_dbus_worker_do_read (GDBusWorker *worker)
+static gboolean
+_g_dbus_worker_do_initial_read (gpointer data)
 {
+  GDBusWorker *worker = data;
   g_mutex_lock (worker->read_lock);
   _g_dbus_worker_do_read_unlocked (worker);
   g_mutex_unlock (worker->read_lock);
+  return FALSE;
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
@@ -833,7 +854,8 @@ static void
 message_to_write_data_free (MessageToWriteData *data)
 {
   _g_dbus_worker_unref (data->worker);
-  g_object_unref (data->message);
+  if (data->message)
+    g_object_unref (data->message);
   g_free (data->blob);
   g_free (data);
 }
@@ -842,7 +864,11 @@ message_to_write_data_free (MessageToWriteData *data)
 
 static void write_message_continue_writing (MessageToWriteData *data);
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
 static void
 write_message_async_cb (GObject      *source_object,
                         GAsyncResult *res,
@@ -864,8 +890,7 @@ write_message_async_cb (GObject      *source_object,
                                                 &error);
   if (bytes_written == -1)
     {
-      g_simple_async_result_set_from_error (simple, error);
-      g_error_free (error);
+      g_simple_async_result_take_error (simple, error);
       g_simple_async_result_complete (simple);
       g_object_unref (simple);
       goto out;
@@ -889,7 +914,11 @@ write_message_async_cb (GObject      *source_object,
   ;
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
 static gboolean
 on_socket_ready (GSocket      *socket,
                  GIOCondition  condition,
@@ -900,7 +929,11 @@ on_socket_ready (GSocket      *socket,
   return FALSE; /* remove source */
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
 static void
 write_message_continue_writing (MessageToWriteData *data)
 {
@@ -969,7 +1002,7 @@ write_message_continue_writing (MessageToWriteData *data)
       if (bytes_written == -1)
         {
           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
-          if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_WOULD_BLOCK)
+          if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
             {
               GSource *source;
               source = g_socket_create_source (data->worker->socket,
@@ -981,10 +1014,10 @@ write_message_continue_writing (MessageToWriteData *data)
                                      NULL); /* GDestroyNotify */
               g_source_attach (source, g_main_context_get_thread_default ());
               g_source_unref (source);
+              g_error_free (error);
               goto out;
             }
-          g_simple_async_result_set_from_error (simple, error);
-          g_error_free (error);
+          g_simple_async_result_take_error (simple, error);
           g_simple_async_result_complete (simple);
           g_object_unref (simple);
           goto out;
@@ -1007,6 +1040,7 @@ write_message_continue_writing (MessageToWriteData *data)
 #endif
   else
     {
+#ifdef G_OS_UNIX
       if (fd_list != NULL)
         {
           g_simple_async_result_set_error (simple,
@@ -1018,6 +1052,7 @@ write_message_continue_writing (MessageToWriteData *data)
           g_object_unref (simple);
           goto out;
         }
+#endif
 
       g_output_stream_write_async (ostream,
                                    (const gchar *) data->blob + data->total_written,
@@ -1031,7 +1066,11 @@ write_message_continue_writing (MessageToWriteData *data)
   ;
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
 static void
 write_message_async (GDBusWorker         *worker,
                      MessageToWriteData  *data,
@@ -1067,7 +1106,29 @@ typedef struct
   GList *flushers;
 } FlushAsyncData;
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static void
+flush_data_list_complete (const GList  *flushers,
+                          const GError *error)
+{
+  const GList *l;
+
+  for (l = flushers; l != NULL; l = l->next)
+    {
+      FlushData *f = l->data;
+
+      f->error = error != NULL ? g_error_copy (error) : NULL;
+
+      g_mutex_lock (f->mutex);
+      g_cond_signal (f->cond);
+      g_mutex_unlock (f->mutex);
+    }
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
 static void
 ostream_flush_cb (GObject      *source_object,
                   GAsyncResult *res,
@@ -1075,7 +1136,6 @@ ostream_flush_cb (GObject      *source_object,
 {
   FlushAsyncData *data = user_data;
   GError *error;
-  GList *l;
 
   error = NULL;
   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
@@ -1096,21 +1156,19 @@ ostream_flush_cb (GObject      *source_object,
     }
 
   g_assert (data->flushers != NULL);
-  for (l = data->flushers; l != NULL; l = l->next)
-    {
-      FlushData *f = l->data;
-
-      f->error = error != NULL ? g_error_copy (error) : NULL;
-
-      g_mutex_lock (f->mutex);
-      g_cond_signal (f->cond);
-      g_mutex_unlock (f->mutex);
-    }
+  flush_data_list_complete (data->flushers, error);
   g_list_free (data->flushers);
 
   if (error != NULL)
     g_error_free (error);
 
+  /* Make sure we tell folks that we don't have additional
+     flushes pending */
+  g_mutex_lock (data->worker->write_lock);
+  g_assert (data->worker->output_pending);
+  data->worker->output_pending = FALSE;
+  g_mutex_unlock (data->worker->write_lock);
+
   /* OK, cool, finally kick off the next write */
   maybe_write_next_message (data->worker);
 
@@ -1118,7 +1176,11 @@ ostream_flush_cb (GObject      *source_object,
   g_free (data);
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is false on entry
+ */
 static void
 message_written (GDBusWorker *worker,
                  MessageToWriteData *message_data)
@@ -1163,6 +1225,11 @@ message_written (GDBusWorker *worker,
           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
         }
     }
+  if (flushers != NULL)
+    {
+      g_assert (!worker->output_pending);
+      worker->output_pending = TRUE;
+    }
   g_mutex_unlock (worker->write_lock);
 
   if (flushers != NULL)
@@ -1185,7 +1252,11 @@ message_written (GDBusWorker *worker,
     }
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
 static void
 write_message_cb (GObject       *source_object,
                   GAsyncResult  *res,
@@ -1195,7 +1266,8 @@ write_message_cb (GObject       *source_object,
   GError *error;
 
   g_mutex_lock (data->worker->write_lock);
-  data->worker->num_writes_pending -= 1;
+  g_assert (data->worker->output_pending);
+  data->worker->output_pending = FALSE;
   g_mutex_unlock (data->worker->write_lock);
 
   error = NULL;
@@ -1215,22 +1287,114 @@ write_message_cb (GObject       *source_object,
   message_to_write_data_free (data);
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
+static void
+iostream_close_cb (GObject      *source_object,
+                   GAsyncResult *res,
+                   gpointer      user_data)
+{
+  GDBusWorker *worker = user_data;
+  GError *error = NULL;
+  GList *pending_close_attempts, *pending_flush_attempts;
+  GQueue *send_queue;
+
+  g_io_stream_close_finish (worker->stream, res, &error);
+
+  g_mutex_lock (worker->write_lock);
+
+  pending_close_attempts = worker->pending_close_attempts;
+  worker->pending_close_attempts = NULL;
+
+  pending_flush_attempts = worker->write_pending_flushes;
+  worker->write_pending_flushes = NULL;
+
+  send_queue = worker->write_queue;
+  worker->write_queue = g_queue_new ();
+
+  g_assert (worker->output_pending);
+  worker->output_pending = FALSE;
+
+  g_mutex_unlock (worker->write_lock);
+
+  while (pending_close_attempts != NULL)
+    {
+      CloseData *close_data = pending_close_attempts->data;
+
+      pending_close_attempts = g_list_delete_link (pending_close_attempts,
+                                                   pending_close_attempts);
+
+      if (close_data->result != NULL)
+        {
+          if (error != NULL)
+            g_simple_async_result_set_from_error (close_data->result, error);
+
+          /* this must be in an idle because the result is likely to be
+           * intended for another thread
+           */
+          g_simple_async_result_complete_in_idle (close_data->result);
+        }
+
+      close_data_free (close_data);
+    }
+
+  g_clear_error (&error);
+
+  /* all messages queued for sending are discarded */
+  g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
+  g_queue_free (send_queue);
+
+  /* all queued flushes fail */
+  error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
+                       _("Operation was cancelled"));
+  flush_data_list_complete (pending_flush_attempts, error);
+  g_list_free (pending_flush_attempts);
+  g_clear_error (&error);
+
+  _g_dbus_worker_unref (worker);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending must be false on entry
+ */
 static void
 maybe_write_next_message (GDBusWorker *worker)
 {
   MessageToWriteData *data;
 
  write_next:
+  /* we mustn't try to write two things at once */
+  g_assert (!worker->output_pending);
 
   g_mutex_lock (worker->write_lock);
-  data = g_queue_pop_head (worker->write_queue);
-  if (data != NULL)
-    worker->num_writes_pending += 1;
+
+  /* if we want to close the connection, that takes precedence */
+  if (worker->pending_close_attempts != NULL)
+    {
+      worker->output_pending = TRUE;
+
+      g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
+                               NULL, iostream_close_cb,
+                               _g_dbus_worker_ref (worker));
+      data = NULL;
+    }
+  else
+    {
+      data = g_queue_pop_head (worker->write_queue);
+
+      if (data != NULL)
+        worker->output_pending = TRUE;
+    }
+
   g_mutex_unlock (worker->write_lock);
 
   /* Note that write_lock is only used for protecting the @write_queue
-   * and @num_writes_pending fields of the GDBusWorker struct ... which we
+   * and @output_pending fields of the GDBusWorker struct ... which we
    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
    *
    * Therefore, it's fine to drop it here when calling back into user
@@ -1239,60 +1403,102 @@ maybe_write_next_message (GDBusWorker *worker)
    */
   if (data != NULL)
     {
-      gboolean message_was_dropped;
-      message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
-      if (G_UNLIKELY (message_was_dropped))
+      GDBusMessage *old_message;
+      guchar *new_blob;
+      gsize new_blob_size;
+      GError *error;
+
+      old_message = data->message;
+      data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
+      if (data->message == old_message)
         {
+          /* filters had no effect - do nothing */
+        }
+      else if (data->message == NULL)
+        {
+          /* filters dropped message */
           g_mutex_lock (worker->write_lock);
-          worker->num_writes_pending -= 1;
+          worker->output_pending = FALSE;
           g_mutex_unlock (worker->write_lock);
           message_to_write_data_free (data);
           goto write_next;
         }
       else
         {
-          write_message_async (worker,
-                               data,
-                               write_message_cb,
-                               data);
+          /* filters altered the message -> reencode */
+          error = NULL;
+          new_blob = g_dbus_message_to_blob (data->message,
+                                             &new_blob_size,
+                                             worker->capabilities,
+                                             &error);
+          if (new_blob == NULL)
+            {
+              /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
+               * the old message instead
+               */
+              g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
+                         g_dbus_message_get_serial (data->message),
+                         error->message);
+              g_error_free (error);
+            }
+          else
+            {
+              g_free (data->blob);
+              data->blob = (gchar *) new_blob;
+              data->blob_size = new_blob_size;
+            }
         }
+
+      write_message_async (worker,
+                           data,
+                           write_message_cb,
+                           data);
     }
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending may be true or false
+ */
 static gboolean
 write_message_in_idle_cb (gpointer user_data)
 {
   GDBusWorker *worker = user_data;
-  if (worker->num_writes_pending == 0)
+
+  /* Because this is the worker thread, we can read this struct member
+   * without holding the lock: no other thread ever modifies it.
+   */
+  if (!worker->output_pending)
     maybe_write_next_message (worker);
+
   return FALSE;
 }
 
-/* ---------------------------------------------------------------------------------------------------- */
-
-/* can be called from any thread - steals blob */
-void
-_g_dbus_worker_send_message (GDBusWorker    *worker,
-                             GDBusMessage   *message,
-                             gchar          *blob,
-                             gsize           blob_len)
+/*
+ * @write_data: (transfer full) (allow-none):
+ * @close_data: (transfer full) (allow-none):
+ *
+ * Can be called from any thread
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
+static void
+schedule_write_in_worker_thread (GDBusWorker        *worker,
+                                 MessageToWriteData *write_data,
+                                 CloseData          *close_data)
 {
-  MessageToWriteData *data;
+  g_mutex_lock (worker->write_lock);
 
-  g_return_if_fail (G_IS_DBUS_MESSAGE (message));
-  g_return_if_fail (blob != NULL);
-  g_return_if_fail (blob_len > 16);
+  if (write_data != NULL)
+    g_queue_push_tail (worker->write_queue, write_data);
 
-  data = g_new0 (MessageToWriteData, 1);
-  data->worker = _g_dbus_worker_ref (worker);
-  data->message = g_object_ref (message);
-  data->blob = blob; /* steal! */
-  data->blob_size = blob_len;
+  if (close_data != NULL)
+    worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
+                                                     close_data);
 
-  g_mutex_lock (worker->write_lock);
-  g_queue_push_tail (worker->write_queue, data);
-  if (worker->num_writes_pending == 0)
+  if (!worker->output_pending)
     {
       GSource *idle_source;
       idle_source = g_idle_source_new ();
@@ -1301,25 +1507,43 @@ _g_dbus_worker_send_message (GDBusWorker    *worker,
                              write_message_in_idle_cb,
                              _g_dbus_worker_ref (worker),
                              (GDestroyNotify) _g_dbus_worker_unref);
-      g_source_attach (idle_source, shared_thread_data->context);
+      g_source_attach (idle_source, worker->shared_thread_data->context);
       g_source_unref (idle_source);
     }
+
   g_mutex_unlock (worker->write_lock);
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
 
-static void
-_g_dbus_worker_thread_begin_func (gpointer user_data)
+/* can be called from any thread - steals blob
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
+void
+_g_dbus_worker_send_message (GDBusWorker    *worker,
+                             GDBusMessage   *message,
+                             gchar          *blob,
+                             gsize           blob_len)
 {
-  GDBusWorker *worker = user_data;
+  MessageToWriteData *data;
+
+  g_return_if_fail (G_IS_DBUS_MESSAGE (message));
+  g_return_if_fail (blob != NULL);
+  g_return_if_fail (blob_len > 16);
 
-  worker->thread = g_thread_self ();
+  data = g_new0 (MessageToWriteData, 1);
+  data->worker = _g_dbus_worker_ref (worker);
+  data->message = g_object_ref (message);
+  data->blob = blob; /* steal! */
+  data->blob_size = blob_len;
 
-  /* begin reading */
-  _g_dbus_worker_do_read (worker);
+  schedule_write_in_worker_thread (worker, data, NULL);
 }
 
+/* ---------------------------------------------------------------------------------------------------- */
+
 GDBusWorker *
 _g_dbus_worker_new (GIOStream                              *stream,
                     GDBusCapabilityFlags                    capabilities,
@@ -1330,6 +1554,7 @@ _g_dbus_worker_new (GIOStream                              *stream,
                     gpointer                                user_data)
 {
   GDBusWorker *worker;
+  GSource *idle_source;
 
   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
   g_return_val_if_fail (message_received_callback != NULL, NULL);
@@ -1347,6 +1572,7 @@ _g_dbus_worker_new (GIOStream                              *stream,
   worker->stream = g_object_ref (stream);
   worker->capabilities = capabilities;
   worker->cancellable = g_cancellable_new ();
+  worker->output_pending = FALSE;
 
   worker->frozen = initially_frozen;
   worker->received_messages_while_frozen = g_queue_new ();
@@ -1357,30 +1583,66 @@ _g_dbus_worker_new (GIOStream                              *stream,
   if (G_IS_SOCKET_CONNECTION (worker->stream))
     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
 
-  _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker);
+  worker->shared_thread_data = _g_dbus_shared_thread_ref ();
+
+  /* begin reading */
+  idle_source = g_idle_source_new ();
+  g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+  g_source_set_callback (idle_source,
+                         _g_dbus_worker_do_initial_read,
+                         worker,
+                         NULL);
+  g_source_attach (idle_source, worker->shared_thread_data->context);
+  g_source_unref (idle_source);
 
   return worker;
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
 
-/* This can be called from any thread - frees worker - guarantees no callbacks
- * will ever be issued again
+/* can be called from any thread
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
+void
+_g_dbus_worker_close (GDBusWorker         *worker,
+                      GCancellable        *cancellable,
+                      GSimpleAsyncResult  *result)
+{
+  CloseData *close_data;
+
+  close_data = g_slice_new0 (CloseData);
+  close_data->worker = _g_dbus_worker_ref (worker);
+  close_data->cancellable =
+      (cancellable == NULL ? NULL : g_object_ref (cancellable));
+  close_data->result = (result == NULL ? NULL : g_object_ref (result));
+
+  g_cancellable_cancel (worker->cancellable);
+  schedule_write_in_worker_thread (worker, NULL, close_data);
+}
+
+/* This can be called from any thread - frees worker. Note that
+ * callbacks might still happen if called from another thread than the
+ * worker - use your own synchronization primitive in the callbacks.
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
  */
 void
 _g_dbus_worker_stop (GDBusWorker *worker)
 {
-  /* If we're called in the worker thread it means we are called from
-   * a worker callback. This is fine, we just can't lock in that case since
-   * we're already holding the lock...
+  g_atomic_int_set (&worker->stopped, TRUE);
+
+  /* Cancel any pending operations and schedule a close of the underlying I/O
+   * stream in the worker thread
    */
-  if (g_thread_self () != worker->thread)
-    g_mutex_lock (worker->read_lock);
-  worker->stopped = TRUE;
-  if (g_thread_self () != worker->thread)
-    g_mutex_unlock (worker->read_lock);
+  _g_dbus_worker_close (worker, NULL, NULL);
 
-  g_cancellable_cancel (worker->cancellable);
+  /* _g_dbus_worker_close holds a ref until after an idle in the the worker
+   * thread has run, so we no longer need to unref in an idle like in
+   * commit 322e25b535
+   */
   _g_dbus_worker_unref (worker);
 }
 
@@ -1389,6 +1651,9 @@ _g_dbus_worker_stop (GDBusWorker *worker)
 /* can be called from any thread (except the worker thread) - blocks
  * calling thread until all queued outgoing messages are written and
  * the transport has been flushed
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
  */
 gboolean
 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
@@ -1551,6 +1816,7 @@ _g_dbus_initialize (void)
       const gchar *debug;
 
       g_dbus_error_domain = G_DBUS_ERROR;
+      (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
 
       debug = g_getenv ("G_DBUS_DEBUG");
       if (debug != NULL)
@@ -1787,3 +2053,21 @@ read_message_print_transport_debug (gssize bytes_read,
  out:
   ;
 }
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+gboolean
+_g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
+                                     GValue                *return_accu,
+                                     const GValue          *handler_return,
+                                     gpointer               dummy)
+{
+  gboolean continue_emission;
+  gboolean signal_return;
+
+  signal_return = g_value_get_boolean (handler_return);
+  g_value_set_boolean (return_accu, signal_return);
+  continue_emission = signal_return;
+
+  return continue_emission;
+}