GMutex *write_lock;
GQueue *write_queue;
gboolean write_is_pending;
+ guint64 write_num_messages_written;
+ GList *write_pending_flushes;
};
+typedef struct
+{
+ GMutex *mutex;
+ GCond *cond;
+ guint64 number_to_wait_for;
+} FlushData;
+
struct _MessageToWriteData ;
typedef struct _MessageToWriteData MessageToWriteData;
{
if (g_atomic_int_dec_and_test (&worker->ref_count))
{
+ g_assert (worker->write_pending_flushes == NULL);
+
_g_dbus_shared_thread_unref ();
g_object_unref (worker->stream);
GError **error)
{
gboolean ret;
+ GList *l;
+ GList *ll;
g_return_val_if_fail (data->blob_size > 16, FALSE);
ret = TRUE;
+ /* wake up pending flushes */
+ g_mutex_lock (worker->write_lock);
+ worker->write_num_messages_written += 1;
+ for (l = worker->write_pending_flushes; l != NULL; l = ll)
+ {
+ FlushData *f = l->data;
+ ll = l->next;
+
+ if (f->number_to_wait_for == worker->write_num_messages_written)
+ {
+ g_mutex_lock (f->mutex);
+ g_cond_signal (f->cond);
+ g_mutex_unlock (f->mutex);
+ worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
+ }
+ }
+ g_mutex_unlock (worker->write_lock);
+
if (G_UNLIKELY (_g_dbus_debug_message ()))
{
gchar *s;
return worker;
}
+/* ---------------------------------------------------------------------------------------------------- */
+
/* This can be called from any thread - frees worker - guarantees no callbacks
* will ever be issued again
*/
_g_dbus_worker_unref (worker);
}
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* can be called from any thread (except the worker thread) - blocks
+ * calling thread until all queued outgoing messages are written and
+ * the transport has been flushed
+ */
+gboolean
+_g_dbus_worker_flush_sync (GDBusWorker *worker,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gboolean ret;
+ FlushData *data;
+
+ data = NULL;
+
+ /* if the queue is empty, there's nothing to wait for */
+ g_mutex_lock (worker->write_lock);
+ if (g_queue_get_length (worker->write_queue) > 0)
+ {
+ data = g_new0 (FlushData, 1);
+ data->mutex = g_mutex_new ();
+ data->cond = g_cond_new ();
+ data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
+ g_mutex_lock (data->mutex);
+ worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
+ }
+ g_mutex_unlock (worker->write_lock);
+
+ if (data != NULL)
+ {
+ g_cond_wait (data->cond, data->mutex);
+ g_mutex_unlock (data->mutex);
+
+ /* note:the element is removed from worker->write_pending_flushes in write_message() */
+ g_cond_free (data->cond);
+ g_mutex_free (data->mutex);
+ g_free (data);
+ }
+
+ ret = g_output_stream_flush (g_io_stream_get_output_stream (worker->stream),
+ cancellable,
+ error);
+ return ret;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
#define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
#define G_DBUS_DEBUG_MESSAGE (1<<1)
#define G_DBUS_DEBUG_PAYLOAD (1<<2)