schedule_write_in_worker_thread: require caller to lock; rename accordingly
[platform/upstream/glib.git] / gio / gdbusprivate.c
index c30a4ec..6257b35 100644 (file)
@@ -331,6 +331,13 @@ _g_dbus_shared_thread_unref (SharedThreadData *data)
 
 /* ---------------------------------------------------------------------------------------------------- */
 
+typedef enum {
+    PENDING_NONE = 0,
+    PENDING_WRITE,
+    PENDING_FLUSH,
+    PENDING_CLOSE
+} OutputPending;
+
 struct GDBusWorker
 {
   volatile gint                       ref_count;
@@ -368,12 +375,12 @@ struct GDBusWorker
   GSocketControlMessage             **read_ancillary_messages;
   gint                                read_num_ancillary_messages;
 
-  /* TRUE if an async write, flush or close is pending.
+  /* Whether an async write, flush or close, or none of those, is pending.
    * Only the worker thread may change its value, and only with the write_lock.
    * Other threads may read its value when holding the write_lock.
    * The worker thread may read its value at any time.
    */
-  gboolean                            output_pending;
+  OutputPending                       output_pending;
   /* used for writing */
   GMutex                              write_lock;
   /* queue of MessageToWriteData, protected by write_lock */
@@ -384,6 +391,8 @@ struct GDBusWorker
   GList                              *write_pending_flushes;
   /* list of CloseData, protected by write_lock */
   GList                              *pending_close_attempts;
+  /* no lock - only used from the worker thread */
+  gboolean                            close_expected;
 };
 
 static void _g_dbus_worker_unref (GDBusWorker *worker);
@@ -647,7 +656,40 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
 
   if (bytes_read == -1)
     {
-      _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+      if (G_UNLIKELY (_g_dbus_debug_transport ()))
+        {
+          _g_dbus_debug_print_lock ();
+          g_print ("========================================================================\n"
+                   "GDBus-debug:Transport:\n"
+                   "  ---- READ ERROR on stream of type %s:\n"
+                   "  ---- %s %d: %s\n",
+                   g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
+                   g_quark_to_string (error->domain), error->code,
+                   error->message);
+          _g_dbus_debug_print_unlock ();
+        }
+
+      /* Every async read that uses this callback uses worker->cancellable
+       * as its GCancellable. worker->cancellable gets cancelled if and only
+       * if the GDBusConnection tells us to close (either via
+       * _g_dbus_worker_stop, which is called on last-unref, or directly),
+       * so a cancelled read must mean our connection was closed locally.
+       *
+       * If we're closing, other errors are possible - notably,
+       * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
+       * read in-flight. It seems sensible to treat all read errors during
+       * closing as an expected thing that doesn't trip exit-on-close.
+       *
+       * Because close_expected can't be set until we get into the worker
+       * thread, but the cancellable is signalled sooner (from another
+       * thread), we do still need to check the error.
+       */
+      if (worker->close_expected ||
+          g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+        _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
+      else
+        _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+
       g_error_free (error);
       goto out;
     }
@@ -783,6 +825,10 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
 static void
 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
 {
+  /* Note that we do need to keep trying to read even if close_expected is
+   * true, because only failing a read causes us to signal 'closed'.
+   */
+
   /* if bytes_wanted is zero, it means start reading a message */
   if (worker->read_buffer_bytes_wanted == 0)
     {
@@ -864,7 +910,7 @@ static void write_message_continue_writing (MessageToWriteData *data);
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
  */
 static void
 write_message_async_cb (GObject      *source_object,
@@ -914,7 +960,7 @@ write_message_async_cb (GObject      *source_object,
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
  */
 static gboolean
 on_socket_ready (GSocket      *socket,
@@ -929,7 +975,7 @@ on_socket_ready (GSocket      *socket,
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
  */
 static void
 write_message_continue_writing (MessageToWriteData *data)
@@ -1066,7 +1112,7 @@ write_message_continue_writing (MessageToWriteData *data)
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
  */
 static void
 write_message_async (GDBusWorker         *worker,
@@ -1095,7 +1141,7 @@ write_message_finish (GAsyncResult   *res,
 }
 /* ---------------------------------------------------------------------------------------------------- */
 
-static void maybe_write_next_message (GDBusWorker *worker);
+static void continue_writing (GDBusWorker *worker);
 
 typedef struct
 {
@@ -1124,7 +1170,7 @@ flush_data_list_complete (const GList  *flushers,
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_FLUSH on entry
  */
 static void
 ostream_flush_cb (GObject      *source_object,
@@ -1162,12 +1208,12 @@ ostream_flush_cb (GObject      *source_object,
   /* Make sure we tell folks that we don't have additional
      flushes pending */
   g_mutex_lock (&data->worker->write_lock);
-  g_assert (data->worker->output_pending);
-  data->worker->output_pending = FALSE;
+  g_assert (data->worker->output_pending == PENDING_FLUSH);
+  data->worker->output_pending = PENDING_NONE;
   g_mutex_unlock (&data->worker->write_lock);
 
   /* OK, cool, finally kick off the next write */
-  maybe_write_next_message (data->worker);
+  continue_writing (data->worker);
 
   _g_dbus_worker_unref (data->worker);
   g_free (data);
@@ -1176,7 +1222,7 @@ ostream_flush_cb (GObject      *source_object,
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is false on entry
+ * output_pending is PENDING_NONE on entry
  */
 static void
 message_written (GDBusWorker *worker,
@@ -1224,8 +1270,8 @@ message_written (GDBusWorker *worker,
     }
   if (flushers != NULL)
     {
-      g_assert (!worker->output_pending);
-      worker->output_pending = TRUE;
+      g_assert (worker->output_pending == PENDING_NONE);
+      worker->output_pending = PENDING_FLUSH;
     }
   g_mutex_unlock (&worker->write_lock);
 
@@ -1245,14 +1291,14 @@ message_written (GDBusWorker *worker,
   else
     {
       /* kick off the next write! */
-      maybe_write_next_message (worker);
+      continue_writing (worker);
     }
 }
 
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
  */
 static void
 write_message_cb (GObject       *source_object,
@@ -1263,8 +1309,8 @@ write_message_cb (GObject       *source_object,
   GError *error;
 
   g_mutex_lock (&data->worker->write_lock);
-  g_assert (data->worker->output_pending);
-  data->worker->output_pending = FALSE;
+  g_assert (data->worker->output_pending == PENDING_WRITE);
+  data->worker->output_pending = PENDING_NONE;
   g_mutex_unlock (&data->worker->write_lock);
 
   error = NULL;
@@ -1287,7 +1333,7 @@ write_message_cb (GObject       *source_object,
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_CLOSE on entry
  */
 static void
 iostream_close_cb (GObject      *source_object,
@@ -1312,8 +1358,8 @@ iostream_close_cb (GObject      *source_object,
   send_queue = worker->write_queue;
   worker->write_queue = g_queue_new ();
 
-  g_assert (worker->output_pending);
-  worker->output_pending = FALSE;
+  g_assert (worker->output_pending == PENDING_CLOSE);
+  worker->output_pending = PENDING_NONE;
 
   g_mutex_unlock (&worker->write_lock);
 
@@ -1357,23 +1403,24 @@ iostream_close_cb (GObject      *source_object,
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending must be false on entry
+ * output_pending must be PENDING_NONE on entry
  */
 static void
-maybe_write_next_message (GDBusWorker *worker)
+continue_writing (GDBusWorker *worker)
 {
   MessageToWriteData *data;
 
  write_next:
   /* we mustn't try to write two things at once */
-  g_assert (!worker->output_pending);
+  g_assert (worker->output_pending == PENDING_NONE);
 
   g_mutex_lock (&worker->write_lock);
 
   /* if we want to close the connection, that takes precedence */
   if (worker->pending_close_attempts != NULL)
     {
-      worker->output_pending = TRUE;
+      worker->close_expected = TRUE;
+      worker->output_pending = PENDING_CLOSE;
 
       g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
                                NULL, iostream_close_cb,
@@ -1385,7 +1432,7 @@ maybe_write_next_message (GDBusWorker *worker)
       data = g_queue_pop_head (worker->write_queue);
 
       if (data != NULL)
-        worker->output_pending = TRUE;
+        worker->output_pending = PENDING_WRITE;
     }
 
   g_mutex_unlock (&worker->write_lock);
@@ -1415,7 +1462,7 @@ maybe_write_next_message (GDBusWorker *worker)
         {
           /* filters dropped message */
           g_mutex_lock (&worker->write_lock);
-          worker->output_pending = FALSE;
+          worker->output_pending = PENDING_NONE;
           g_mutex_unlock (&worker->write_lock);
           message_to_write_data_free (data);
           goto write_next;
@@ -1456,18 +1503,18 @@ maybe_write_next_message (GDBusWorker *worker)
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
  */
 static gboolean
-write_message_in_idle_cb (gpointer user_data)
+continue_writing_in_idle_cb (gpointer user_data)
 {
   GDBusWorker *worker = user_data;
 
   /* Because this is the worker thread, we can read this struct member
    * without holding the lock: no other thread ever modifies it.
    */
-  if (!worker->output_pending)
-    maybe_write_next_message (worker);
+  if (worker->output_pending == PENDING_NONE)
+    continue_writing (worker);
 
   return FALSE;
 }
@@ -1478,16 +1525,14 @@ write_message_in_idle_cb (gpointer user_data)
  *
  * Can be called from any thread
  *
- * write_lock is not held on entry
- * output_pending may be true or false
+ * write_lock is held on entry
+ * output_pending may be anything
  */
 static void
-schedule_write_in_worker_thread (GDBusWorker        *worker,
-                                 MessageToWriteData *write_data,
-                                 CloseData          *close_data)
+schedule_writing_unlocked (GDBusWorker        *worker,
+                           MessageToWriteData *write_data,
+                           CloseData          *close_data)
 {
-  g_mutex_lock (&worker->write_lock);
-
   if (write_data != NULL)
     g_queue_push_tail (worker->write_queue, write_data);
 
@@ -1495,20 +1540,18 @@ schedule_write_in_worker_thread (GDBusWorker        *worker,
     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
                                                      close_data);
 
-  if (!worker->output_pending)
+  if (worker->output_pending == PENDING_NONE)
     {
       GSource *idle_source;
       idle_source = g_idle_source_new ();
       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
       g_source_set_callback (idle_source,
-                             write_message_in_idle_cb,
+                             continue_writing_in_idle_cb,
                              _g_dbus_worker_ref (worker),
                              (GDestroyNotify) _g_dbus_worker_unref);
       g_source_attach (idle_source, worker->shared_thread_data->context);
       g_source_unref (idle_source);
     }
-
-  g_mutex_unlock (&worker->write_lock);
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
@@ -1516,7 +1559,7 @@ schedule_write_in_worker_thread (GDBusWorker        *worker,
 /* can be called from any thread - steals blob
  *
  * write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
  */
 void
 _g_dbus_worker_send_message (GDBusWorker    *worker,
@@ -1536,7 +1579,9 @@ _g_dbus_worker_send_message (GDBusWorker    *worker,
   data->blob = blob; /* steal! */
   data->blob_size = blob_len;
 
-  schedule_write_in_worker_thread (worker, data, NULL);
+  g_mutex_lock (&worker->write_lock);
+  schedule_writing_unlocked (worker, data, NULL);
+  g_mutex_unlock (&worker->write_lock);
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
@@ -1569,7 +1614,7 @@ _g_dbus_worker_new (GIOStream                              *stream,
   worker->stream = g_object_ref (stream);
   worker->capabilities = capabilities;
   worker->cancellable = g_cancellable_new ();
-  worker->output_pending = FALSE;
+  worker->output_pending = PENDING_NONE;
 
   worker->frozen = initially_frozen;
   worker->received_messages_while_frozen = g_queue_new ();
@@ -1600,7 +1645,7 @@ _g_dbus_worker_new (GIOStream                              *stream,
 /* can be called from any thread
  *
  * write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
  */
 void
 _g_dbus_worker_close (GDBusWorker         *worker,
@@ -1615,8 +1660,13 @@ _g_dbus_worker_close (GDBusWorker         *worker,
       (cancellable == NULL ? NULL : g_object_ref (cancellable));
   close_data->result = (result == NULL ? NULL : g_object_ref (result));
 
+  /* Don't set worker->close_expected here - we're in the wrong thread.
+   * It'll be set before the actual close happens.
+   */
   g_cancellable_cancel (worker->cancellable);
-  schedule_write_in_worker_thread (worker, NULL, close_data);
+  g_mutex_lock (&worker->write_lock);
+  schedule_writing_unlocked (worker, NULL, close_data);
+  g_mutex_unlock (&worker->write_lock);
 }
 
 /* This can be called from any thread - frees worker. Note that
@@ -1624,7 +1674,7 @@ _g_dbus_worker_close (GDBusWorker         *worker,
  * worker - use your own synchronization primitive in the callbacks.
  *
  * write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
  */
 void
 _g_dbus_worker_stop (GDBusWorker *worker)
@@ -1650,7 +1700,7 @@ _g_dbus_worker_stop (GDBusWorker *worker)
  * the transport has been flushed
  *
  * write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
  */
 gboolean
 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
@@ -1940,17 +1990,26 @@ gchar *
 _g_dbus_get_machine_id (GError **error)
 {
   gchar *ret;
+  GError *first_error;
   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
   ret = NULL;
+  first_error = NULL;
   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
                             &ret,
                             NULL,
-                            error))
+                            &first_error) &&
+      !g_file_get_contents ("/etc/machine-id",
+                            &ret,
+                            NULL,
+                            NULL))
     {
-      g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
+      g_propagate_prefixed_error (error, first_error,
+                                  _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
     }
   else
     {
+      /* ignore the error from the first try, if any */
+      g_clear_error (&first_error);
       /* TODO: validate value */
       g_strstrip (ret);
     }