if (connection->stream != NULL)
{
- /* We don't really care if closing the stream succeeds or not */
- g_io_stream_close_async (connection->stream,
- G_PRIORITY_DEFAULT,
- NULL, /* GCancellable */
- NULL, /* GAsyncReadyCallback */
- NULL); /* userdata */
g_object_unref (connection->stream);
connection->stream = NULL;
}
/* ---------------------------------------------------------------------------------------------------- */
-static void
-close_in_thread_func (GSimpleAsyncResult *res,
- GObject *object,
- GCancellable *cancellable)
-{
- GError *error;
-
- error = NULL;
- if (!g_dbus_connection_close_sync (G_DBUS_CONNECTION (object),
- cancellable,
- &error))
- g_simple_async_result_take_error (res, error);
-}
-
/**
* g_dbus_connection_close:
* @connection: A #GDBusConnection.
callback,
user_data,
g_dbus_connection_close);
- g_simple_async_result_run_in_thread (simple,
- close_in_thread_func,
- G_PRIORITY_DEFAULT,
- cancellable);
+ _g_dbus_worker_close (connection->worker, cancellable, simple);
g_object_unref (simple);
}
return ret;
}
+typedef struct {
+ GMainLoop *loop;
+ GAsyncResult *result;
+} SyncCloseData;
+
+static void
+sync_close_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ SyncCloseData *data = user_data;
+
+ data->result = g_object_ref (res);
+ g_main_loop_quit (data->loop);
+}
+
/**
* g_dbus_connection_close_sync:
* @connection: A #GDBusConnection.
CONNECTION_LOCK (connection);
if (!connection->closed)
{
- ret = g_io_stream_close (connection->stream,
- cancellable,
- error);
- if (ret)
- set_closed_unlocked (connection, FALSE, NULL);
+ GMainContext *context;
+ SyncCloseData data;
+
+ context = g_main_context_new ();
+ g_main_context_push_thread_default (context);
+ data.loop = g_main_loop_new (context, TRUE);
+ data.result = NULL;
+
+ CONNECTION_UNLOCK (connection);
+ g_dbus_connection_close (connection, cancellable, sync_close_cb, &data);
+ g_main_loop_run (data.loop);
+ ret = g_dbus_connection_close_finish (connection, data.result, error);
+ CONNECTION_LOCK (connection);
+
+ g_object_unref (data.result);
+ g_main_loop_unref (data.loop);
+ g_main_context_pop_thread_default (context);
+ g_main_context_unref (context);
}
else
{
GSocketControlMessage **read_ancillary_messages;
gint read_num_ancillary_messages;
- /* TRUE if an async write or flush is pending.
+ /* TRUE if an async write, flush or close is pending.
* Only the worker thread may change its value, and only with the write_lock.
* Other threads may read its value when holding the write_lock.
* The worker thread may read its value at any time.
GQueue *write_queue;
guint64 write_num_messages_written;
GList *write_pending_flushes;
+ /* list of CloseData */
+ GList *pending_close_attempts;
};
+static void _g_dbus_worker_unref (GDBusWorker *worker);
+
/* ---------------------------------------------------------------------------------------------------- */
typedef struct
static void write_message_print_transport_debug (gssize bytes_written,
MessageToWriteData *data);
+typedef struct {
+ GDBusWorker *worker;
+ GCancellable *cancellable;
+ GSimpleAsyncResult *result;
+} CloseData;
+
+static void close_data_free (CloseData *close_data)
+{
+ if (close_data->cancellable != NULL)
+ g_object_unref (close_data->cancellable);
+
+ if (close_data->result != NULL)
+ g_object_unref (close_data->result);
+
+ _g_dbus_worker_unref (close_data->worker);
+ g_slice_free (CloseData, close_data);
+}
+
/* ---------------------------------------------------------------------------------------------------- */
static GDBusWorker *
GList *flushers;
} FlushAsyncData;
+static void
+flush_data_list_complete (const GList *flushers,
+ const GError *error)
+{
+ const GList *l;
+
+ for (l = flushers; l != NULL; l = l->next)
+ {
+ FlushData *f = l->data;
+
+ f->error = error != NULL ? g_error_copy (error) : NULL;
+
+ g_mutex_lock (f->mutex);
+ g_cond_signal (f->cond);
+ g_mutex_unlock (f->mutex);
+ }
+}
+
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static void
ostream_flush_cb (GObject *source_object,
{
FlushAsyncData *data = user_data;
GError *error;
- GList *l;
error = NULL;
g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
}
g_assert (data->flushers != NULL);
- for (l = data->flushers; l != NULL; l = l->next)
- {
- FlushData *f = l->data;
-
- f->error = error != NULL ? g_error_copy (error) : NULL;
-
- g_mutex_lock (f->mutex);
- g_cond_signal (f->cond);
- g_mutex_unlock (f->mutex);
- }
+ flush_data_list_complete (data->flushers, error);
g_list_free (data->flushers);
if (error != NULL)
message_to_write_data_free (data);
}
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
+static void
+iostream_close_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GDBusWorker *worker = user_data;
+ GError *error = NULL;
+ GList *pending_close_attempts, *pending_flush_attempts;
+ GQueue *send_queue;
+
+ g_io_stream_close_finish (worker->stream, res, &error);
+
+ g_mutex_lock (worker->write_lock);
+
+ pending_close_attempts = worker->pending_close_attempts;
+ worker->pending_close_attempts = NULL;
+
+ pending_flush_attempts = worker->write_pending_flushes;
+ worker->write_pending_flushes = NULL;
+
+ send_queue = worker->write_queue;
+ worker->write_queue = g_queue_new ();
+
+ g_assert (worker->output_pending);
+ worker->output_pending = FALSE;
+
+ g_mutex_unlock (worker->write_lock);
+
+ while (pending_close_attempts != NULL)
+ {
+ CloseData *close_data = pending_close_attempts->data;
+
+ pending_close_attempts = g_list_delete_link (pending_close_attempts,
+ pending_close_attempts);
+
+ if (close_data->result != NULL)
+ {
+ if (error != NULL)
+ g_simple_async_result_set_from_error (close_data->result, error);
+
+ /* this must be in an idle because the result is likely to be
+ * intended for another thread
+ */
+ g_simple_async_result_complete_in_idle (close_data->result);
+ }
+
+ close_data_free (close_data);
+ }
+
+ g_clear_error (&error);
+
+ /* all messages queued for sending are discarded */
+ g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
+ g_queue_free (send_queue);
+
+ /* all queued flushes fail */
+ error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ flush_data_list_complete (pending_flush_attempts, error);
+ g_list_free (pending_flush_attempts);
+ g_clear_error (&error);
+
+ _g_dbus_worker_unref (worker);
+}
+
/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static void
maybe_write_next_message (GDBusWorker *worker)
g_assert (!worker->output_pending);
g_mutex_lock (worker->write_lock);
- data = g_queue_pop_head (worker->write_queue);
- if (data != NULL)
- worker->output_pending = TRUE;
+
+ /* if we want to close the connection, that takes precedence */
+ if (worker->pending_close_attempts != NULL)
+ {
+ worker->output_pending = TRUE;
+
+ g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
+ NULL, iostream_close_cb,
+ _g_dbus_worker_ref (worker));
+ data = NULL;
+ }
+ else
+ {
+ data = g_queue_pop_head (worker->write_queue);
+
+ if (data != NULL)
+ worker->output_pending = TRUE;
+ }
+
g_mutex_unlock (worker->write_lock);
/* Note that write_lock is only used for protecting the @write_queue
return FALSE;
}
+/*
+ * @write_data: (transfer full) (allow-none):
+ * @close_data: (transfer full) (allow-none):
+ *
+ * Can be called from any thread
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
+static void
+schedule_write_in_worker_thread (GDBusWorker *worker,
+ MessageToWriteData *write_data,
+ CloseData *close_data)
+{
+ g_mutex_lock (worker->write_lock);
+
+ if (write_data != NULL)
+ g_queue_push_tail (worker->write_queue, write_data);
+
+ if (close_data != NULL)
+ worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
+ close_data);
+
+ if (!worker->output_pending)
+ {
+ GSource *idle_source;
+ idle_source = g_idle_source_new ();
+ g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (idle_source,
+ write_message_in_idle_cb,
+ _g_dbus_worker_ref (worker),
+ (GDestroyNotify) _g_dbus_worker_unref);
+ g_source_attach (idle_source, worker->shared_thread_data->context);
+ g_source_unref (idle_source);
+ }
+
+ g_mutex_unlock (worker->write_lock);
+}
+
/* ---------------------------------------------------------------------------------------------------- */
/* can be called from any thread - steals blob */
data->blob = blob; /* steal! */
data->blob_size = blob_len;
- g_mutex_lock (worker->write_lock);
- g_queue_push_tail (worker->write_queue, data);
- if (!worker->output_pending)
- {
- GSource *idle_source;
- idle_source = g_idle_source_new ();
- g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
- g_source_set_callback (idle_source,
- write_message_in_idle_cb,
- _g_dbus_worker_ref (worker),
- (GDestroyNotify) _g_dbus_worker_unref);
- g_source_attach (idle_source, worker->shared_thread_data->context);
- g_source_unref (idle_source);
- }
- g_mutex_unlock (worker->write_lock);
+ schedule_write_in_worker_thread (worker, data, NULL);
}
/* ---------------------------------------------------------------------------------------------------- */
/* ---------------------------------------------------------------------------------------------------- */
-/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
-static gboolean
-unref_in_idle_cb (gpointer user_data)
+/* can be called from any thread
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
+void
+_g_dbus_worker_close (GDBusWorker *worker,
+ GCancellable *cancellable,
+ GSimpleAsyncResult *result)
{
- GDBusWorker *worker = user_data;
- _g_dbus_worker_unref (worker);
- return FALSE;
+ CloseData *close_data;
+
+ close_data = g_slice_new0 (CloseData);
+ close_data->worker = _g_dbus_worker_ref (worker);
+ close_data->cancellable =
+ (cancellable == NULL ? NULL : g_object_ref (cancellable));
+ close_data->result = (result == NULL ? NULL : g_object_ref (result));
+
+ g_cancellable_cancel (worker->cancellable);
+ schedule_write_in_worker_thread (worker, NULL, close_data);
}
/* This can be called from any thread - frees worker. Note that
void
_g_dbus_worker_stop (GDBusWorker *worker)
{
- GSource *idle_source;
-
worker->stopped = TRUE;
- g_cancellable_cancel (worker->cancellable);
- idle_source = g_idle_source_new ();
- g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
- g_source_set_callback (idle_source,
- unref_in_idle_cb,
- _g_dbus_worker_ref (worker),
- (GDestroyNotify) _g_dbus_worker_unref);
- g_source_attach (idle_source, worker->shared_thread_data->context);
- g_source_unref (idle_source);
+ /* Cancel any pending operations and schedule a close of the underlying I/O
+ * stream in the worker thread
+ */
+ _g_dbus_worker_close (worker, NULL, NULL);
+
+ /* _g_dbus_worker_close holds a ref until after an idle in the the worker
+ * thread has run, so we no longer need to unref in an idle like in
+ * commit 322e25b535
+ */
+ _g_dbus_worker_unref (worker);
}
/* ---------------------------------------------------------------------------------------------------- */