write_message_continue_writing (data);
}
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances (with write-lock held) */
static gboolean
write_message_finish (GAsyncResult *res,
GError **error)
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is PENDING_NONE on entry
+ * output_pending is PENDING_FLUSH on entry
*/
static void
-message_written (GDBusWorker *worker,
- MessageToWriteData *message_data)
+start_flush (FlushAsyncData *data)
{
- GList *l;
- GList *ll;
- GList *flushers;
+ g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
+ G_PRIORITY_DEFAULT,
+ data->worker->cancellable,
+ ostream_flush_cb,
+ data);
+}
- /* first log the fact that we wrote a message */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ */
+static void
+message_written_unlocked (GDBusWorker *worker,
+ MessageToWriteData *message_data)
+{
if (G_UNLIKELY (_g_dbus_debug_message ()))
{
gchar *s;
_g_dbus_debug_print_unlock ();
}
- /* then first wake up pending flushes and, if needed, flush the stream */
- flushers = NULL;
- g_mutex_lock (&worker->write_lock);
worker->write_num_messages_written += 1;
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ *
+ * Returns: non-%NULL, setting @output_pending, if we need to flush now
+ */
+static FlushAsyncData *
+prepare_flush_unlocked (GDBusWorker *worker)
+{
+ GList *l;
+ GList *ll;
+ GList *flushers;
+
+ flushers = NULL;
for (l = worker->write_pending_flushes; l != NULL; l = ll)
{
FlushData *f = l->data;
g_assert (worker->output_pending == PENDING_NONE);
worker->output_pending = PENDING_FLUSH;
}
- g_mutex_unlock (&worker->write_lock);
if (flushers != NULL)
{
FlushAsyncData *data;
+
data = g_new0 (FlushAsyncData, 1);
data->worker = _g_dbus_worker_ref (worker);
data->flushers = flushers;
- /* flush the stream before writing the next message */
- g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
- G_PRIORITY_DEFAULT,
- worker->cancellable,
- ostream_flush_cb,
- data);
- }
- else
- {
- /* kick off the next write! */
- continue_writing (worker);
+ return data;
}
+
+ return NULL;
}
/* called in private thread shared by all GDBusConnection instances
g_mutex_lock (&data->worker->write_lock);
g_assert (data->worker->output_pending == PENDING_WRITE);
data->worker->output_pending = PENDING_NONE;
- g_mutex_unlock (&data->worker->write_lock);
error = NULL;
if (!write_message_finish (res, &error))
{
+ g_mutex_unlock (&data->worker->write_lock);
+
/* TODO: handle */
_g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
g_error_free (error);
+
+ g_mutex_lock (&data->worker->write_lock);
}
- /* this function will also kick of the next write (it might need to
- * flush so writing the next message might happen much later
- * e.g. async)
- */
- message_written (data->worker, data);
+ message_written_unlocked (data->worker, data);
+
+ g_mutex_unlock (&data->worker->write_lock);
+
+ continue_writing (data->worker);
message_to_write_data_free (data);
}
continue_writing (GDBusWorker *worker)
{
MessageToWriteData *data;
+ FlushAsyncData *flush_async_data;
write_next:
/* we mustn't try to write two things at once */
}
else
{
- data = g_queue_pop_head (worker->write_queue);
+ flush_async_data = prepare_flush_unlocked (worker);
- if (data != NULL)
- worker->output_pending = PENDING_WRITE;
+ if (flush_async_data == NULL)
+ {
+ data = g_queue_pop_head (worker->write_queue);
+
+ if (data != NULL)
+ worker->output_pending = PENDING_WRITE;
+ }
+ else
+ {
+ data = NULL;
+ }
}
g_mutex_unlock (&worker->write_lock);
* code and then writing the message out onto the GIOStream since this
* function only runs on the worker thread.
*/
- if (data != NULL)
+
+ if (flush_async_data != NULL)
+ {
+ start_flush (flush_async_data);
+ g_assert (data == NULL);
+ }
+ else if (data != NULL)
{
GDBusMessage *old_message;
guchar *new_blob;