GDBusWorker: if a read was cancelled it means we closed the connection
[platform/upstream/glib.git] / gio / gdbusprivate.c
index 8bc6e5b..38b17f3 100644 (file)
@@ -287,7 +287,6 @@ static SharedThreadData *
 _g_dbus_shared_thread_ref (void)
 {
   static gsize shared_thread_data = 0;
-  GError *error = NULL;
   SharedThreadData *ret;
 
   if (g_once_init_enter (&shared_thread_data))
@@ -302,11 +301,9 @@ _g_dbus_shared_thread_ref (void)
       
       data->context = g_main_context_new ();
       data->loop = g_main_loop_new (data->context, FALSE);
-      data->thread = g_thread_create (gdbus_shared_thread_func,
-                                     data,
-                                     TRUE,
-                                     &error);
-      g_assert_no_error (error);
+      data->thread = g_thread_new ("gdbus",
+                                   gdbus_shared_thread_func,
+                                   data);
       /* We can cast between gsize and gpointer safely */
       g_once_init_leave (&shared_thread_data, (gsize) data);
     }
@@ -340,7 +337,8 @@ struct GDBusWorker
 
   SharedThreadData                   *shared_thread_data;
 
-  gboolean                            stopped;
+  /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
+  volatile gint                       stopped;
 
   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
    * only affects messages received from the other peer (since GDBusServer is the
@@ -361,7 +359,7 @@ struct GDBusWorker
   GSocket *socket;
 
   /* used for reading */
-  GMutex                             *read_lock;
+  GMutex                              read_lock;
   gchar                              *read_buffer;
   gsize                               read_buffer_allocated_size;
   gsize                               read_buffer_cur_size;
@@ -377,7 +375,7 @@ struct GDBusWorker
    */
   gboolean                            output_pending;
   /* used for writing */
-  GMutex                             *write_lock;
+  GMutex                              write_lock;
   /* queue of MessageToWriteData, protected by write_lock */
   GQueue                             *write_queue;
   /* protected by write_lock */
@@ -394,8 +392,8 @@ static void _g_dbus_worker_unref (GDBusWorker *worker);
 
 typedef struct
 {
-  GMutex *mutex;
-  GCond *cond;
+  GMutex  mutex;
+  GCond   cond;
   guint64 number_to_wait_for;
   GError *error;
 } FlushData;
@@ -449,7 +447,7 @@ _g_dbus_worker_unref (GDBusWorker *worker)
 
       g_object_unref (worker->stream);
 
-      g_mutex_free (worker->read_lock);
+      g_mutex_clear (&worker->read_lock);
       g_object_unref (worker->cancellable);
       if (worker->read_fd_list != NULL)
         g_object_unref (worker->read_fd_list);
@@ -457,7 +455,7 @@ _g_dbus_worker_unref (GDBusWorker *worker)
       g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
       g_queue_free (worker->received_messages_while_frozen);
 
-      g_mutex_free (worker->write_lock);
+      g_mutex_clear (&worker->write_lock);
       g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
       g_queue_free (worker->write_queue);
 
@@ -472,7 +470,7 @@ _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
                                   gboolean      remote_peer_vanished,
                                   GError       *error)
 {
-  if (!worker->stopped)
+  if (!g_atomic_int_get (&worker->stopped))
     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
 }
 
@@ -480,7 +478,7 @@ static void
 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
                                       GDBusMessage *message)
 {
-  if (!worker->stopped)
+  if (!g_atomic_int_get (&worker->stopped))
     worker->message_received_callback (worker, message, worker->user_data);
 }
 
@@ -489,7 +487,7 @@ _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
                                               GDBusMessage *message)
 {
   GDBusMessage *ret;
-  if (!worker->stopped)
+  if (!g_atomic_int_get (&worker->stopped))
     ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
   else
     ret = message;
@@ -521,7 +519,7 @@ unfreeze_in_idle_cb (gpointer user_data)
   GDBusWorker *worker = user_data;
   GDBusMessage *message;
 
-  g_mutex_lock (worker->read_lock);
+  g_mutex_lock (&worker->read_lock);
   if (worker->frozen)
     {
       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
@@ -535,7 +533,7 @@ unfreeze_in_idle_cb (gpointer user_data)
     {
       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
     }
-  g_mutex_unlock (worker->read_lock);
+  g_mutex_unlock (&worker->read_lock);
   return FALSE;
 }
 
@@ -568,10 +566,10 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
   GError *error;
   gssize bytes_read;
 
-  g_mutex_lock (worker->read_lock);
+  g_mutex_lock (&worker->read_lock);
 
   /* If already stopped, don't even process the reply */
-  if (worker->stopped)
+  if (g_atomic_int_get (&worker->stopped))
     goto out;
 
   error = NULL;
@@ -649,7 +647,17 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
 
   if (bytes_read == -1)
     {
-      _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+      /* Every async read that uses this callback uses worker->cancellable
+       * as its GCancellable. worker->cancellable gets cancelled if and only
+       * if the GDBusConnection tells us to close (either via
+       * _g_dbus_worker_stop, which is called on last-unref, or directly),
+       * so a cancelled read must mean our connection was closed locally.
+       */
+      if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+        _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
+      else
+        _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+
       g_error_free (error);
       goto out;
     }
@@ -775,7 +783,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
     }
 
  out:
-  g_mutex_unlock (worker->read_lock);
+  g_mutex_unlock (&worker->read_lock);
 
   /* gives up the reference acquired when calling g_input_stream_read_async() */
   _g_dbus_worker_unref (worker);
@@ -829,9 +837,9 @@ static gboolean
 _g_dbus_worker_do_initial_read (gpointer data)
 {
   GDBusWorker *worker = data;
-  g_mutex_lock (worker->read_lock);
+  g_mutex_lock (&worker->read_lock);
   _g_dbus_worker_do_read_unlocked (worker);
-  g_mutex_unlock (worker->read_lock);
+  g_mutex_unlock (&worker->read_lock);
   return FALSE;
 }
 
@@ -1117,9 +1125,9 @@ flush_data_list_complete (const GList  *flushers,
 
       f->error = error != NULL ? g_error_copy (error) : NULL;
 
-      g_mutex_lock (f->mutex);
-      g_cond_signal (f->cond);
-      g_mutex_unlock (f->mutex);
+      g_mutex_lock (&f->mutex);
+      g_cond_signal (&f->cond);
+      g_mutex_unlock (&f->mutex);
     }
 }
 
@@ -1163,10 +1171,10 @@ ostream_flush_cb (GObject      *source_object,
 
   /* Make sure we tell folks that we don't have additional
      flushes pending */
-  g_mutex_lock (data->worker->write_lock);
+  g_mutex_lock (&data->worker->write_lock);
   g_assert (data->worker->output_pending);
   data->worker->output_pending = FALSE;
-  g_mutex_unlock (data->worker->write_lock);
+  g_mutex_unlock (&data->worker->write_lock);
 
   /* OK, cool, finally kick off the next write */
   maybe_write_next_message (data->worker);
@@ -1211,7 +1219,7 @@ message_written (GDBusWorker *worker,
 
   /* then first wake up pending flushes and, if needed, flush the stream */
   flushers = NULL;
-  g_mutex_lock (worker->write_lock);
+  g_mutex_lock (&worker->write_lock);
   worker->write_num_messages_written += 1;
   for (l = worker->write_pending_flushes; l != NULL; l = ll)
     {
@@ -1229,7 +1237,7 @@ message_written (GDBusWorker *worker,
       g_assert (!worker->output_pending);
       worker->output_pending = TRUE;
     }
-  g_mutex_unlock (worker->write_lock);
+  g_mutex_unlock (&worker->write_lock);
 
   if (flushers != NULL)
     {
@@ -1264,10 +1272,10 @@ write_message_cb (GObject       *source_object,
   MessageToWriteData *data = user_data;
   GError *error;
 
-  g_mutex_lock (data->worker->write_lock);
+  g_mutex_lock (&data->worker->write_lock);
   g_assert (data->worker->output_pending);
   data->worker->output_pending = FALSE;
-  g_mutex_unlock (data->worker->write_lock);
+  g_mutex_unlock (&data->worker->write_lock);
 
   error = NULL;
   if (!write_message_finish (res, &error))
@@ -1303,7 +1311,7 @@ iostream_close_cb (GObject      *source_object,
 
   g_io_stream_close_finish (worker->stream, res, &error);
 
-  g_mutex_lock (worker->write_lock);
+  g_mutex_lock (&worker->write_lock);
 
   pending_close_attempts = worker->pending_close_attempts;
   worker->pending_close_attempts = NULL;
@@ -1317,7 +1325,7 @@ iostream_close_cb (GObject      *source_object,
   g_assert (worker->output_pending);
   worker->output_pending = FALSE;
 
-  g_mutex_unlock (worker->write_lock);
+  g_mutex_unlock (&worker->write_lock);
 
   while (pending_close_attempts != NULL)
     {
@@ -1370,7 +1378,7 @@ maybe_write_next_message (GDBusWorker *worker)
   /* we mustn't try to write two things at once */
   g_assert (!worker->output_pending);
 
-  g_mutex_lock (worker->write_lock);
+  g_mutex_lock (&worker->write_lock);
 
   /* if we want to close the connection, that takes precedence */
   if (worker->pending_close_attempts != NULL)
@@ -1390,7 +1398,7 @@ maybe_write_next_message (GDBusWorker *worker)
         worker->output_pending = TRUE;
     }
 
-  g_mutex_unlock (worker->write_lock);
+  g_mutex_unlock (&worker->write_lock);
 
   /* Note that write_lock is only used for protecting the @write_queue
    * and @output_pending fields of the GDBusWorker struct ... which we
@@ -1416,9 +1424,9 @@ maybe_write_next_message (GDBusWorker *worker)
       else if (data->message == NULL)
         {
           /* filters dropped message */
-          g_mutex_lock (worker->write_lock);
+          g_mutex_lock (&worker->write_lock);
           worker->output_pending = FALSE;
-          g_mutex_unlock (worker->write_lock);
+          g_mutex_unlock (&worker->write_lock);
           message_to_write_data_free (data);
           goto write_next;
         }
@@ -1488,7 +1496,7 @@ schedule_write_in_worker_thread (GDBusWorker        *worker,
                                  MessageToWriteData *write_data,
                                  CloseData          *close_data)
 {
-  g_mutex_lock (worker->write_lock);
+  g_mutex_lock (&worker->write_lock);
 
   if (write_data != NULL)
     g_queue_push_tail (worker->write_queue, write_data);
@@ -1510,7 +1518,7 @@ schedule_write_in_worker_thread (GDBusWorker        *worker,
       g_source_unref (idle_source);
     }
 
-  g_mutex_unlock (worker->write_lock);
+  g_mutex_unlock (&worker->write_lock);
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
@@ -1563,7 +1571,7 @@ _g_dbus_worker_new (GIOStream                              *stream,
   worker = g_new0 (GDBusWorker, 1);
   worker->ref_count = 1;
 
-  worker->read_lock = g_mutex_new ();
+  g_mutex_init (&worker->read_lock);
   worker->message_received_callback = message_received_callback;
   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
   worker->disconnected_callback = disconnected_callback;
@@ -1576,7 +1584,7 @@ _g_dbus_worker_new (GIOStream                              *stream,
   worker->frozen = initially_frozen;
   worker->received_messages_while_frozen = g_queue_new ();
 
-  worker->write_lock = g_mutex_new ();
+  g_mutex_init (&worker->write_lock);
   worker->write_queue = g_queue_new ();
 
   if (G_IS_SOCKET_CONNECTION (worker->stream))
@@ -1589,8 +1597,8 @@ _g_dbus_worker_new (GIOStream                              *stream,
   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
   g_source_set_callback (idle_source,
                          _g_dbus_worker_do_initial_read,
-                         worker,
-                         NULL);
+                         _g_dbus_worker_ref (worker),
+                         (GDestroyNotify) _g_dbus_worker_unref);
   g_source_attach (idle_source, worker->shared_thread_data->context);
   g_source_unref (idle_source);
 
@@ -1631,7 +1639,7 @@ _g_dbus_worker_close (GDBusWorker         *worker,
 void
 _g_dbus_worker_stop (GDBusWorker *worker)
 {
-  worker->stopped = TRUE;
+  g_atomic_int_set (&worker->stopped, TRUE);
 
   /* Cancel any pending operations and schedule a close of the underlying I/O
    * stream in the worker thread
@@ -1666,26 +1674,26 @@ _g_dbus_worker_flush_sync (GDBusWorker    *worker,
   ret = TRUE;
 
   /* if the queue is empty, there's nothing to wait for */
-  g_mutex_lock (worker->write_lock);
+  g_mutex_lock (&worker->write_lock);
   if (g_queue_get_length (worker->write_queue) > 0)
     {
       data = g_new0 (FlushData, 1);
-      data->mutex = g_mutex_new ();
-      data->cond = g_cond_new ();
+      g_mutex_init (&data->mutex);
+      g_cond_init (&data->cond);
       data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
-      g_mutex_lock (data->mutex);
+      g_mutex_lock (&data->mutex);
       worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
     }
-  g_mutex_unlock (worker->write_lock);
+  g_mutex_unlock (&worker->write_lock);
 
   if (data != NULL)
     {
-      g_cond_wait (data->cond, data->mutex);
-      g_mutex_unlock (data->mutex);
+      g_cond_wait (&data->cond, &data->mutex);
+      g_mutex_unlock (&data->mutex);
 
       /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
-      g_cond_free (data->cond);
-      g_mutex_free (data->mutex);
+      g_cond_clear (&data->cond);
+      g_mutex_clear (&data->mutex);
       if (data->error != NULL)
         {
           ret = FALSE;