Bug 618882 – No way to ensure that a message is sent
[platform/upstream/glib.git] / gio / gdbusprivate.c
index bcc031e..819aa34 100644 (file)
@@ -388,8 +388,17 @@ struct GDBusWorker
   GMutex                             *write_lock;
   GQueue                             *write_queue;
   gboolean                            write_is_pending;
+  guint64                             write_num_messages_written;
+  GList                              *write_pending_flushes;
 };
 
+typedef struct
+{
+  GMutex *mutex;
+  GCond *cond;
+  guint64 number_to_wait_for;
+} FlushData;
+
 struct _MessageToWriteData ;
 typedef struct _MessageToWriteData MessageToWriteData;
 
@@ -407,6 +416,8 @@ _g_dbus_worker_unref (GDBusWorker *worker)
 {
   if (g_atomic_int_dec_and_test (&worker->ref_count))
     {
+      g_assert (worker->write_pending_flushes == NULL);
+
       _g_dbus_shared_thread_unref ();
 
       g_object_unref (worker->stream);
@@ -815,6 +826,8 @@ write_message (GDBusWorker         *worker,
                GError             **error)
 {
   gboolean ret;
+  GList *l;
+  GList *ll;
 
   g_return_val_if_fail (data->blob_size > 16, FALSE);
 
@@ -908,6 +921,24 @@ write_message (GDBusWorker         *worker,
 
   ret = TRUE;
 
+  /* wake up pending flushes */
+  g_mutex_lock (worker->write_lock);
+  worker->write_num_messages_written += 1;
+  for (l = worker->write_pending_flushes; l != NULL; l = ll)
+    {
+      FlushData *f = l->data;
+      ll = l->next;
+
+      if (f->number_to_wait_for == worker->write_num_messages_written)
+        {
+          g_mutex_lock (f->mutex);
+          g_cond_signal (f->cond);
+          g_mutex_unlock (f->mutex);
+          worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
+        }
+    }
+  g_mutex_unlock (worker->write_lock);
+
   if (G_UNLIKELY (_g_dbus_debug_message ()))
     {
       gchar *s;
@@ -1072,6 +1103,8 @@ _g_dbus_worker_new (GIOStream                              *stream,
   return worker;
 }
 
+/* ---------------------------------------------------------------------------------------------------- */
+
 /* This can be called from any thread - frees worker - guarantees no callbacks
  * will ever be issued again
  */
@@ -1092,6 +1125,54 @@ _g_dbus_worker_stop (GDBusWorker *worker)
   _g_dbus_worker_unref (worker);
 }
 
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* can be called from any thread (except the worker thread) - blocks
+ * calling thread until all queued outgoing messages are written and
+ * the transport has been flushed
+ */
+gboolean
+_g_dbus_worker_flush_sync (GDBusWorker    *worker,
+                           GCancellable   *cancellable,
+                           GError        **error)
+{
+  gboolean ret;
+  FlushData *data;
+
+  data = NULL;
+
+  /* if the queue is empty, there's nothing to wait for */
+  g_mutex_lock (worker->write_lock);
+  if (g_queue_get_length (worker->write_queue) > 0)
+    {
+      data = g_new0 (FlushData, 1);
+      data->mutex = g_mutex_new ();
+      data->cond = g_cond_new ();
+      data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
+      g_mutex_lock (data->mutex);
+      worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
+    }
+  g_mutex_unlock (worker->write_lock);
+
+  if (data != NULL)
+    {
+      g_cond_wait (data->cond, data->mutex);
+      g_mutex_unlock (data->mutex);
+
+      /* note:the element is removed from worker->write_pending_flushes in write_message() */
+      g_cond_free (data->cond);
+      g_mutex_free (data->mutex);
+      g_free (data);
+    }
+
+  ret = g_output_stream_flush (g_io_stream_get_output_stream (worker->stream),
+                               cancellable,
+                               error);
+  return ret;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
 #define G_DBUS_DEBUG_MESSAGE        (1<<1)
 #define G_DBUS_DEBUG_PAYLOAD        (1<<2)