GDBusWorker: move flush async op into continue_writing()
authorSimon McVittie <simon.mcvittie@collabora.co.uk>
Mon, 21 Nov 2011 17:18:01 +0000 (17:18 +0000)
committerSimon McVittie <simon.mcvittie@collabora.co.uk>
Mon, 21 Nov 2011 18:10:11 +0000 (18:10 +0000)
This makes it easier to schedule a flush, by putting it on the same code
path as writing and closing.

Also change message_written to expect the lock to be held, since all
that's left in that function either wants to hold the lock or doesn't
care, and it's silly to release the lock immediately before calling
message_written, which just takes it again.

Bug: https://bugzilla.gnome.org/show_bug.cgi?id=662395
Signed-off-by: Simon McVittie <simon.mcvittie@collabora.co.uk>
Reviewed-by: Cosimo Alfarano <cosimo.alfarano@collabora.co.uk>
gio/gdbusprivate.c

index 6257b35..48606c4 100644 (file)
@@ -1128,7 +1128,7 @@ write_message_async (GDBusWorker         *worker,
   write_message_continue_writing (data);
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances (with write-lock held) */
 static gboolean
 write_message_finish (GAsyncResult   *res,
                       GError        **error)
@@ -1222,17 +1222,27 @@ ostream_flush_cb (GObject      *source_object,
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is PENDING_NONE on entry
+ * output_pending is PENDING_FLUSH on entry
  */
 static void
-message_written (GDBusWorker *worker,
-                 MessageToWriteData *message_data)
+start_flush (FlushAsyncData *data)
 {
-  GList *l;
-  GList *ll;
-  GList *flushers;
+  g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
+                               G_PRIORITY_DEFAULT,
+                               data->worker->cancellable,
+                               ostream_flush_cb,
+                               data);
+}
 
-  /* first log the fact that we wrote a message */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ */
+static void
+message_written_unlocked (GDBusWorker *worker,
+                          MessageToWriteData *message_data)
+{
   if (G_UNLIKELY (_g_dbus_debug_message ()))
     {
       gchar *s;
@@ -1253,10 +1263,24 @@ message_written (GDBusWorker *worker,
       _g_dbus_debug_print_unlock ();
     }
 
-  /* then first wake up pending flushes and, if needed, flush the stream */
-  flushers = NULL;
-  g_mutex_lock (&worker->write_lock);
   worker->write_num_messages_written += 1;
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ *
+ * Returns: non-%NULL, setting @output_pending, if we need to flush now
+ */
+static FlushAsyncData *
+prepare_flush_unlocked (GDBusWorker *worker)
+{
+  GList *l;
+  GList *ll;
+  GList *flushers;
+
+  flushers = NULL;
   for (l = worker->write_pending_flushes; l != NULL; l = ll)
     {
       FlushData *f = l->data;
@@ -1273,26 +1297,18 @@ message_written (GDBusWorker *worker,
       g_assert (worker->output_pending == PENDING_NONE);
       worker->output_pending = PENDING_FLUSH;
     }
-  g_mutex_unlock (&worker->write_lock);
 
   if (flushers != NULL)
     {
       FlushAsyncData *data;
+
       data = g_new0 (FlushAsyncData, 1);
       data->worker = _g_dbus_worker_ref (worker);
       data->flushers = flushers;
-      /* flush the stream before writing the next message */
-      g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
-                                   G_PRIORITY_DEFAULT,
-                                   worker->cancellable,
-                                   ostream_flush_cb,
-                                   data);
-    }
-  else
-    {
-      /* kick off the next write! */
-      continue_writing (worker);
+      return data;
     }
+
+  return NULL;
 }
 
 /* called in private thread shared by all GDBusConnection instances
@@ -1311,21 +1327,24 @@ write_message_cb (GObject       *source_object,
   g_mutex_lock (&data->worker->write_lock);
   g_assert (data->worker->output_pending == PENDING_WRITE);
   data->worker->output_pending = PENDING_NONE;
-  g_mutex_unlock (&data->worker->write_lock);
 
   error = NULL;
   if (!write_message_finish (res, &error))
     {
+      g_mutex_unlock (&data->worker->write_lock);
+
       /* TODO: handle */
       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
       g_error_free (error);
+
+      g_mutex_lock (&data->worker->write_lock);
     }
 
-  /* this function will also kick of the next write (it might need to
-   * flush so writing the next message might happen much later
-   * e.g. async)
-   */
-  message_written (data->worker, data);
+  message_written_unlocked (data->worker, data);
+
+  g_mutex_unlock (&data->worker->write_lock);
+
+  continue_writing (data->worker);
 
   message_to_write_data_free (data);
 }
@@ -1409,6 +1428,7 @@ static void
 continue_writing (GDBusWorker *worker)
 {
   MessageToWriteData *data;
+  FlushAsyncData *flush_async_data;
 
  write_next:
   /* we mustn't try to write two things at once */
@@ -1429,10 +1449,19 @@ continue_writing (GDBusWorker *worker)
     }
   else
     {
-      data = g_queue_pop_head (worker->write_queue);
+      flush_async_data = prepare_flush_unlocked (worker);
 
-      if (data != NULL)
-        worker->output_pending = PENDING_WRITE;
+      if (flush_async_data == NULL)
+        {
+          data = g_queue_pop_head (worker->write_queue);
+
+          if (data != NULL)
+            worker->output_pending = PENDING_WRITE;
+        }
+      else
+        {
+          data = NULL;
+        }
     }
 
   g_mutex_unlock (&worker->write_lock);
@@ -1445,7 +1474,13 @@ continue_writing (GDBusWorker *worker)
    * code and then writing the message out onto the GIOStream since this
    * function only runs on the worker thread.
    */
-  if (data != NULL)
+
+  if (flush_async_data != NULL)
+    {
+      start_flush (flush_async_data);
+      g_assert (data == NULL);
+    }
+  else if (data != NULL)
     {
       GDBusMessage *old_message;
       guchar *new_blob;