_g_dbus_shared_thread_ref (void)
{
static gsize shared_thread_data = 0;
- GError *error = NULL;
SharedThreadData *ret;
if (g_once_init_enter (&shared_thread_data))
data->context = g_main_context_new ();
data->loop = g_main_loop_new (data->context, FALSE);
- data->thread = g_thread_create (gdbus_shared_thread_func,
- data,
- TRUE,
- &error);
- g_assert_no_error (error);
+ data->thread = g_thread_new ("gdbus",
+ gdbus_shared_thread_func,
+ data);
/* We can cast between gsize and gpointer safely */
g_once_init_leave (&shared_thread_data, (gsize) data);
}
SharedThreadData *shared_thread_data;
- gboolean stopped;
+ /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
+ volatile gint stopped;
/* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
* only affects messages received from the other peer (since GDBusServer is the
GSocket *socket;
/* used for reading */
- GMutex *read_lock;
+ GMutex read_lock;
gchar *read_buffer;
gsize read_buffer_allocated_size;
gsize read_buffer_cur_size;
*/
gboolean output_pending;
/* used for writing */
- GMutex *write_lock;
+ GMutex write_lock;
/* queue of MessageToWriteData, protected by write_lock */
GQueue *write_queue;
/* protected by write_lock */
typedef struct
{
- GMutex *mutex;
- GCond *cond;
+ GMutex mutex;
+ GCond cond;
guint64 number_to_wait_for;
GError *error;
} FlushData;
g_object_unref (worker->stream);
- g_mutex_free (worker->read_lock);
+ g_mutex_clear (&worker->read_lock);
g_object_unref (worker->cancellable);
if (worker->read_fd_list != NULL)
g_object_unref (worker->read_fd_list);
g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
g_queue_free (worker->received_messages_while_frozen);
- g_mutex_free (worker->write_lock);
+ g_mutex_clear (&worker->write_lock);
g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
g_queue_free (worker->write_queue);
gboolean remote_peer_vanished,
GError *error)
{
- if (!worker->stopped)
+ if (!g_atomic_int_get (&worker->stopped))
worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
}
_g_dbus_worker_emit_message_received (GDBusWorker *worker,
GDBusMessage *message)
{
- if (!worker->stopped)
+ if (!g_atomic_int_get (&worker->stopped))
worker->message_received_callback (worker, message, worker->user_data);
}
GDBusMessage *message)
{
GDBusMessage *ret;
- if (!worker->stopped)
+ if (!g_atomic_int_get (&worker->stopped))
ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
else
ret = message;
GDBusWorker *worker = user_data;
GDBusMessage *message;
- g_mutex_lock (worker->read_lock);
+ g_mutex_lock (&worker->read_lock);
if (worker->frozen)
{
while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
{
g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
}
- g_mutex_unlock (worker->read_lock);
+ g_mutex_unlock (&worker->read_lock);
return FALSE;
}
GError *error;
gssize bytes_read;
- g_mutex_lock (worker->read_lock);
+ g_mutex_lock (&worker->read_lock);
/* If already stopped, don't even process the reply */
- if (worker->stopped)
+ if (g_atomic_int_get (&worker->stopped))
goto out;
error = NULL;
if (bytes_read == -1)
{
- _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+ /* Every async read that uses this callback uses worker->cancellable
+ * as its GCancellable. worker->cancellable gets cancelled if and only
+ * if the GDBusConnection tells us to close (either via
+ * _g_dbus_worker_stop, which is called on last-unref, or directly),
+ * so a cancelled read must mean our connection was closed locally.
+ */
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+ _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
+ else
+ _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+
g_error_free (error);
goto out;
}
}
out:
- g_mutex_unlock (worker->read_lock);
+ g_mutex_unlock (&worker->read_lock);
/* gives up the reference acquired when calling g_input_stream_read_async() */
_g_dbus_worker_unref (worker);
_g_dbus_worker_do_initial_read (gpointer data)
{
GDBusWorker *worker = data;
- g_mutex_lock (worker->read_lock);
+ g_mutex_lock (&worker->read_lock);
_g_dbus_worker_do_read_unlocked (worker);
- g_mutex_unlock (worker->read_lock);
+ g_mutex_unlock (&worker->read_lock);
return FALSE;
}
f->error = error != NULL ? g_error_copy (error) : NULL;
- g_mutex_lock (f->mutex);
- g_cond_signal (f->cond);
- g_mutex_unlock (f->mutex);
+ g_mutex_lock (&f->mutex);
+ g_cond_signal (&f->cond);
+ g_mutex_unlock (&f->mutex);
}
}
/* Make sure we tell folks that we don't have additional
flushes pending */
- g_mutex_lock (data->worker->write_lock);
+ g_mutex_lock (&data->worker->write_lock);
g_assert (data->worker->output_pending);
data->worker->output_pending = FALSE;
- g_mutex_unlock (data->worker->write_lock);
+ g_mutex_unlock (&data->worker->write_lock);
/* OK, cool, finally kick off the next write */
maybe_write_next_message (data->worker);
/* then first wake up pending flushes and, if needed, flush the stream */
flushers = NULL;
- g_mutex_lock (worker->write_lock);
+ g_mutex_lock (&worker->write_lock);
worker->write_num_messages_written += 1;
for (l = worker->write_pending_flushes; l != NULL; l = ll)
{
g_assert (!worker->output_pending);
worker->output_pending = TRUE;
}
- g_mutex_unlock (worker->write_lock);
+ g_mutex_unlock (&worker->write_lock);
if (flushers != NULL)
{
MessageToWriteData *data = user_data;
GError *error;
- g_mutex_lock (data->worker->write_lock);
+ g_mutex_lock (&data->worker->write_lock);
g_assert (data->worker->output_pending);
data->worker->output_pending = FALSE;
- g_mutex_unlock (data->worker->write_lock);
+ g_mutex_unlock (&data->worker->write_lock);
error = NULL;
if (!write_message_finish (res, &error))
g_io_stream_close_finish (worker->stream, res, &error);
- g_mutex_lock (worker->write_lock);
+ g_mutex_lock (&worker->write_lock);
pending_close_attempts = worker->pending_close_attempts;
worker->pending_close_attempts = NULL;
g_assert (worker->output_pending);
worker->output_pending = FALSE;
- g_mutex_unlock (worker->write_lock);
+ g_mutex_unlock (&worker->write_lock);
while (pending_close_attempts != NULL)
{
/* we mustn't try to write two things at once */
g_assert (!worker->output_pending);
- g_mutex_lock (worker->write_lock);
+ g_mutex_lock (&worker->write_lock);
/* if we want to close the connection, that takes precedence */
if (worker->pending_close_attempts != NULL)
worker->output_pending = TRUE;
}
- g_mutex_unlock (worker->write_lock);
+ g_mutex_unlock (&worker->write_lock);
/* Note that write_lock is only used for protecting the @write_queue
* and @output_pending fields of the GDBusWorker struct ... which we
else if (data->message == NULL)
{
/* filters dropped message */
- g_mutex_lock (worker->write_lock);
+ g_mutex_lock (&worker->write_lock);
worker->output_pending = FALSE;
- g_mutex_unlock (worker->write_lock);
+ g_mutex_unlock (&worker->write_lock);
message_to_write_data_free (data);
goto write_next;
}
MessageToWriteData *write_data,
CloseData *close_data)
{
- g_mutex_lock (worker->write_lock);
+ g_mutex_lock (&worker->write_lock);
if (write_data != NULL)
g_queue_push_tail (worker->write_queue, write_data);
g_source_unref (idle_source);
}
- g_mutex_unlock (worker->write_lock);
+ g_mutex_unlock (&worker->write_lock);
}
/* ---------------------------------------------------------------------------------------------------- */
worker = g_new0 (GDBusWorker, 1);
worker->ref_count = 1;
- worker->read_lock = g_mutex_new ();
+ g_mutex_init (&worker->read_lock);
worker->message_received_callback = message_received_callback;
worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
worker->disconnected_callback = disconnected_callback;
worker->frozen = initially_frozen;
worker->received_messages_while_frozen = g_queue_new ();
- worker->write_lock = g_mutex_new ();
+ g_mutex_init (&worker->write_lock);
worker->write_queue = g_queue_new ();
if (G_IS_SOCKET_CONNECTION (worker->stream))
g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
g_source_set_callback (idle_source,
_g_dbus_worker_do_initial_read,
- worker,
- NULL);
+ _g_dbus_worker_ref (worker),
+ (GDestroyNotify) _g_dbus_worker_unref);
g_source_attach (idle_source, worker->shared_thread_data->context);
g_source_unref (idle_source);
void
_g_dbus_worker_stop (GDBusWorker *worker)
{
- worker->stopped = TRUE;
+ g_atomic_int_set (&worker->stopped, TRUE);
/* Cancel any pending operations and schedule a close of the underlying I/O
* stream in the worker thread
ret = TRUE;
/* if the queue is empty, there's nothing to wait for */
- g_mutex_lock (worker->write_lock);
+ g_mutex_lock (&worker->write_lock);
if (g_queue_get_length (worker->write_queue) > 0)
{
data = g_new0 (FlushData, 1);
- data->mutex = g_mutex_new ();
- data->cond = g_cond_new ();
+ g_mutex_init (&data->mutex);
+ g_cond_init (&data->cond);
data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
- g_mutex_lock (data->mutex);
+ g_mutex_lock (&data->mutex);
worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
}
- g_mutex_unlock (worker->write_lock);
+ g_mutex_unlock (&worker->write_lock);
if (data != NULL)
{
- g_cond_wait (data->cond, data->mutex);
- g_mutex_unlock (data->mutex);
+ g_cond_wait (&data->cond, &data->mutex);
+ g_mutex_unlock (&data->mutex);
/* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
- g_cond_free (data->cond);
- g_mutex_free (data->mutex);
+ g_cond_clear (&data->cond);
+ g_mutex_clear (&data->mutex);
if (data->error != NULL)
{
ret = FALSE;