#include "glibintl.h"
+static gboolean _g_dbus_worker_do_initial_read (gpointer data);
+
/* ---------------------------------------------------------------------------------------------------- */
gchar *
}
static void
-released_required_types (void)
+release_required_types (void)
{
g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
g_ptr_array_unref (ensured_classes);
}
/* ---------------------------------------------------------------------------------------------------- */
-G_LOCK_DEFINE_STATIC (shared_thread_lock);
-
typedef struct
{
- gint num_users;
+ volatile gint refcount;
GThread *thread;
GMainContext *context;
GMainLoop *loop;
} SharedThreadData;
-static SharedThreadData *shared_thread_data = NULL;
-
static gpointer
-gdbus_shared_thread_func (gpointer data)
+gdbus_shared_thread_func (gpointer user_data)
{
- g_main_context_push_thread_default (shared_thread_data->context);
- g_main_loop_run (shared_thread_data->loop);
- g_main_context_pop_thread_default (shared_thread_data->context);
- return NULL;
-}
+ SharedThreadData *data = user_data;
-typedef void (*GDBusSharedThreadFunc) (gpointer user_data);
+ g_main_context_push_thread_default (data->context);
+ g_main_loop_run (data->loop);
+ g_main_context_pop_thread_default (data->context);
-typedef struct
-{
- GDBusSharedThreadFunc func;
- gpointer user_data;
- gboolean done;
-} CallerData;
+ release_required_types ();
-static gboolean
-invoke_caller (gpointer user_data)
-{
- CallerData *data = user_data;
- data->func (data->user_data);
- data->done = TRUE;
- return FALSE;
+ return NULL;
}
/* ---------------------------------------------------------------------------------------------------- */
-static void
-_g_dbus_shared_thread_ref (GDBusSharedThreadFunc func,
- gpointer user_data)
+static SharedThreadData *
+_g_dbus_shared_thread_ref (void)
{
- GError *error;
- GSource *idle_source;
- CallerData *data;
- gboolean release_types;
+ static gsize shared_thread_data = 0;
+ GError *error = NULL;
+ SharedThreadData *ret;
- G_LOCK (shared_thread_lock);
-
- release_types = FALSE;
-
- if (shared_thread_data != NULL)
+ if (g_once_init_enter (&shared_thread_data))
{
- shared_thread_data->num_users += 1;
- goto have_thread;
+ SharedThreadData *data;
+
+ /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
+ ensure_required_types ();
+
+ data = g_new0 (SharedThreadData, 1);
+ data->refcount = 0;
+
+ data->context = g_main_context_new ();
+ data->loop = g_main_loop_new (data->context, FALSE);
+ data->thread = g_thread_new ("gdbus",
+ gdbus_shared_thread_func,
+ data,
+ TRUE,
+ &error);
+ g_assert_no_error (error);
+ /* We can cast between gsize and gpointer safely */
+ g_once_init_leave (&shared_thread_data, (gsize) data);
}
- shared_thread_data = g_new0 (SharedThreadData, 1);
- shared_thread_data->num_users = 1;
-
- /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
- ensure_required_types ();
- release_types = TRUE;
-
- error = NULL;
- shared_thread_data->context = g_main_context_new ();
- shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE);
- shared_thread_data->thread = g_thread_create (gdbus_shared_thread_func,
- NULL,
- TRUE,
- &error);
- g_assert_no_error (error);
-
- have_thread:
-
- data = g_new0 (CallerData, 1);
- data->func = func;
- data->user_data = user_data;
- data->done = FALSE;
-
- idle_source = g_idle_source_new ();
- g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
- g_source_set_callback (idle_source,
- invoke_caller,
- data,
- NULL);
- g_source_attach (idle_source, shared_thread_data->context);
- g_source_unref (idle_source);
-
- /* wait for the user code to run.. hmm.. probably use a condition variable instead */
- while (!data->done)
- g_thread_yield ();
-
- if (release_types)
- released_required_types ();
-
- g_free (data);
-
- G_UNLOCK (shared_thread_lock);
+ ret = (SharedThreadData*) shared_thread_data;
+ g_atomic_int_inc (&ret->refcount);
+ return ret;
}
static void
-_g_dbus_shared_thread_unref (void)
+_g_dbus_shared_thread_unref (SharedThreadData *data)
{
/* TODO: actually destroy the shared thread here */
#if 0
- G_LOCK (shared_thread_lock);
- g_assert (shared_thread_data != NULL);
- shared_thread_data->num_users -= 1;
- if (shared_thread_data->num_users == 0)
- {
- g_main_loop_quit (shared_thread_data->loop);
- //g_thread_join (shared_thread_data->thread);
- g_main_loop_unref (shared_thread_data->loop);
- g_main_context_unref (shared_thread_data->context);
- g_free (shared_thread_data);
- shared_thread_data = NULL;
- G_UNLOCK (shared_thread_lock);
- }
- else
+ g_assert (data != NULL);
+ if (g_atomic_int_dec_and_test (&data->refcount))
{
- G_UNLOCK (shared_thread_lock);
+ g_main_loop_quit (data->loop);
+ //g_thread_join (data->thread);
+ g_main_loop_unref (data->loop);
+ g_main_context_unref (data->context);
}
#endif
}
{
volatile gint ref_count;
- gboolean stopped;
+ SharedThreadData *shared_thread_data;
+
+ /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
+ volatile gint stopped;
/* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
* only affects messages received from the other peer (since GDBusServer is the
* only user) - we might want it to affect messages sent to the other peer too?
*/
gboolean frozen;
+ GDBusCapabilityFlags capabilities;
GQueue *received_messages_while_frozen;
GIOStream *stream;
- GDBusCapabilityFlags capabilities;
GCancellable *cancellable;
GDBusWorkerMessageReceivedCallback message_received_callback;
GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
GDBusWorkerDisconnectedCallback disconnected_callback;
gpointer user_data;
- GThread *thread;
-
/* if not NULL, stream is GSocketConnection */
GSocket *socket;
/* used for reading */
- GMutex *read_lock;
+ GMutex read_lock;
gchar *read_buffer;
gsize read_buffer_allocated_size;
gsize read_buffer_cur_size;
GSocketControlMessage **read_ancillary_messages;
gint read_num_ancillary_messages;
+ /* TRUE if an async write, flush or close is pending.
+ * Only the worker thread may change its value, and only with the write_lock.
+ * Other threads may read its value when holding the write_lock.
+ * The worker thread may read its value at any time.
+ */
+ gboolean output_pending;
/* used for writing */
- GMutex *write_lock;
+ GMutex write_lock;
+ /* queue of MessageToWriteData, protected by write_lock */
GQueue *write_queue;
- gint num_writes_pending;
+ /* protected by write_lock */
guint64 write_num_messages_written;
+ /* list of FlushData, protected by write_lock */
GList *write_pending_flushes;
- gboolean flush_pending;
+ /* list of CloseData, protected by write_lock */
+ GList *pending_close_attempts;
};
+static void _g_dbus_worker_unref (GDBusWorker *worker);
+
/* ---------------------------------------------------------------------------------------------------- */
typedef struct
{
- GMutex *mutex;
- GCond *cond;
+ GMutex mutex;
+ GCond cond;
guint64 number_to_wait_for;
GError *error;
} FlushData;
static void write_message_print_transport_debug (gssize bytes_written,
MessageToWriteData *data);
+typedef struct {
+ GDBusWorker *worker;
+ GCancellable *cancellable;
+ GSimpleAsyncResult *result;
+} CloseData;
+
+static void close_data_free (CloseData *close_data)
+{
+ if (close_data->cancellable != NULL)
+ g_object_unref (close_data->cancellable);
+
+ if (close_data->result != NULL)
+ g_object_unref (close_data->result);
+
+ _g_dbus_worker_unref (close_data->worker);
+ g_slice_free (CloseData, close_data);
+}
+
/* ---------------------------------------------------------------------------------------------------- */
static GDBusWorker *
{
g_assert (worker->write_pending_flushes == NULL);
- _g_dbus_shared_thread_unref ();
+ _g_dbus_shared_thread_unref (worker->shared_thread_data);
g_object_unref (worker->stream);
- g_mutex_free (worker->read_lock);
+ g_mutex_clear (&worker->read_lock);
g_object_unref (worker->cancellable);
if (worker->read_fd_list != NULL)
g_object_unref (worker->read_fd_list);
g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
g_queue_free (worker->received_messages_while_frozen);
- g_mutex_free (worker->write_lock);
+ g_mutex_clear (&worker->write_lock);
g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
g_queue_free (worker->write_queue);
gboolean remote_peer_vanished,
GError *error)
{
- if (!worker->stopped)
+ if (!g_atomic_int_get (&worker->stopped))
worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
}
_g_dbus_worker_emit_message_received (GDBusWorker *worker,
GDBusMessage *message)
{
- if (!worker->stopped)
+ if (!g_atomic_int_get (&worker->stopped))
worker->message_received_callback (worker, message, worker->user_data);
}
GDBusMessage *message)
{
GDBusMessage *ret;
- if (!worker->stopped)
+ if (!g_atomic_int_get (&worker->stopped))
ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
else
ret = message;
GDBusWorker *worker = user_data;
GDBusMessage *message;
- g_mutex_lock (worker->read_lock);
+ g_mutex_lock (&worker->read_lock);
if (worker->frozen)
{
while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
{
g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
}
- g_mutex_unlock (worker->read_lock);
+ g_mutex_unlock (&worker->read_lock);
return FALSE;
}
unfreeze_in_idle_cb,
_g_dbus_worker_ref (worker),
(GDestroyNotify) _g_dbus_worker_unref);
- g_source_attach (idle_source, shared_thread_data->context);
+ g_source_attach (idle_source, worker->shared_thread_data->context);
g_source_unref (idle_source);
}
GError *error;
gssize bytes_read;
- g_mutex_lock (worker->read_lock);
+ g_mutex_lock (&worker->read_lock);
/* If already stopped, don't even process the reply */
- if (worker->stopped)
+ if (g_atomic_int_get (&worker->stopped))
goto out;
error = NULL;
}
out:
- g_mutex_unlock (worker->read_lock);
+ g_mutex_unlock (&worker->read_lock);
/* gives up the reference acquired when calling g_input_stream_read_async() */
_g_dbus_worker_unref (worker);
}
/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
-static void
-_g_dbus_worker_do_read (GDBusWorker *worker)
+static gboolean
+_g_dbus_worker_do_initial_read (gpointer data)
{
- g_mutex_lock (worker->read_lock);
+ GDBusWorker *worker = data;
+ g_mutex_lock (&worker->read_lock);
_g_dbus_worker_do_read_unlocked (worker);
- g_mutex_unlock (worker->read_lock);
+ g_mutex_unlock (&worker->read_lock);
+ return FALSE;
}
/* ---------------------------------------------------------------------------------------------------- */
static void write_message_continue_writing (MessageToWriteData *data);
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
static void
write_message_async_cb (GObject *source_object,
GAsyncResult *res,
;
}
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
static gboolean
on_socket_ready (GSocket *socket,
GIOCondition condition,
return FALSE; /* remove source */
}
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
static void
write_message_continue_writing (MessageToWriteData *data)
{
;
}
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
static void
write_message_async (GDBusWorker *worker,
MessageToWriteData *data,
GList *flushers;
} FlushAsyncData;
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+static void
+flush_data_list_complete (const GList *flushers,
+ const GError *error)
+{
+ const GList *l;
+
+ for (l = flushers; l != NULL; l = l->next)
+ {
+ FlushData *f = l->data;
+
+ f->error = error != NULL ? g_error_copy (error) : NULL;
+
+ g_mutex_lock (&f->mutex);
+ g_cond_signal (&f->cond);
+ g_mutex_unlock (&f->mutex);
+ }
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
static void
ostream_flush_cb (GObject *source_object,
GAsyncResult *res,
{
FlushAsyncData *data = user_data;
GError *error;
- GList *l;
error = NULL;
g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
}
g_assert (data->flushers != NULL);
- for (l = data->flushers; l != NULL; l = l->next)
- {
- FlushData *f = l->data;
-
- f->error = error != NULL ? g_error_copy (error) : NULL;
-
- g_mutex_lock (f->mutex);
- g_cond_signal (f->cond);
- g_mutex_unlock (f->mutex);
- }
+ flush_data_list_complete (data->flushers, error);
g_list_free (data->flushers);
if (error != NULL)
/* Make sure we tell folks that we don't have additional
flushes pending */
- g_mutex_lock (data->worker->write_lock);
- data->worker->flush_pending = FALSE;
- g_mutex_unlock (data->worker->write_lock);
+ g_mutex_lock (&data->worker->write_lock);
+ g_assert (data->worker->output_pending);
+ data->worker->output_pending = FALSE;
+ g_mutex_unlock (&data->worker->write_lock);
/* OK, cool, finally kick off the next write */
maybe_write_next_message (data->worker);
g_free (data);
}
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is false on entry
+ */
static void
message_written (GDBusWorker *worker,
MessageToWriteData *message_data)
/* then first wake up pending flushes and, if needed, flush the stream */
flushers = NULL;
- g_mutex_lock (worker->write_lock);
+ g_mutex_lock (&worker->write_lock);
worker->write_num_messages_written += 1;
for (l = worker->write_pending_flushes; l != NULL; l = ll)
{
}
if (flushers != NULL)
{
- worker->flush_pending = TRUE;
+ g_assert (!worker->output_pending);
+ worker->output_pending = TRUE;
}
- g_mutex_unlock (worker->write_lock);
+ g_mutex_unlock (&worker->write_lock);
if (flushers != NULL)
{
}
}
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
static void
write_message_cb (GObject *source_object,
GAsyncResult *res,
MessageToWriteData *data = user_data;
GError *error;
- g_mutex_lock (data->worker->write_lock);
- data->worker->num_writes_pending -= 1;
- g_mutex_unlock (data->worker->write_lock);
+ g_mutex_lock (&data->worker->write_lock);
+ g_assert (data->worker->output_pending);
+ data->worker->output_pending = FALSE;
+ g_mutex_unlock (&data->worker->write_lock);
error = NULL;
if (!write_message_finish (res, &error))
message_to_write_data_free (data);
}
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending is true on entry
+ */
+static void
+iostream_close_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GDBusWorker *worker = user_data;
+ GError *error = NULL;
+ GList *pending_close_attempts, *pending_flush_attempts;
+ GQueue *send_queue;
+
+ g_io_stream_close_finish (worker->stream, res, &error);
+
+ g_mutex_lock (&worker->write_lock);
+
+ pending_close_attempts = worker->pending_close_attempts;
+ worker->pending_close_attempts = NULL;
+
+ pending_flush_attempts = worker->write_pending_flushes;
+ worker->write_pending_flushes = NULL;
+
+ send_queue = worker->write_queue;
+ worker->write_queue = g_queue_new ();
+
+ g_assert (worker->output_pending);
+ worker->output_pending = FALSE;
+
+ g_mutex_unlock (&worker->write_lock);
+
+ while (pending_close_attempts != NULL)
+ {
+ CloseData *close_data = pending_close_attempts->data;
+
+ pending_close_attempts = g_list_delete_link (pending_close_attempts,
+ pending_close_attempts);
+
+ if (close_data->result != NULL)
+ {
+ if (error != NULL)
+ g_simple_async_result_set_from_error (close_data->result, error);
+
+ /* this must be in an idle because the result is likely to be
+ * intended for another thread
+ */
+ g_simple_async_result_complete_in_idle (close_data->result);
+ }
+
+ close_data_free (close_data);
+ }
+
+ g_clear_error (&error);
+
+ /* all messages queued for sending are discarded */
+ g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
+ g_queue_free (send_queue);
+
+ /* all queued flushes fail */
+ error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ flush_data_list_complete (pending_flush_attempts, error);
+ g_list_free (pending_flush_attempts);
+ g_clear_error (&error);
+
+ _g_dbus_worker_unref (worker);
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending must be false on entry
+ */
static void
maybe_write_next_message (GDBusWorker *worker)
{
MessageToWriteData *data;
write_next:
+ /* we mustn't try to write two things at once */
+ g_assert (!worker->output_pending);
- g_mutex_lock (worker->write_lock);
- data = g_queue_pop_head (worker->write_queue);
- if (data != NULL)
- worker->num_writes_pending += 1;
- g_mutex_unlock (worker->write_lock);
+ g_mutex_lock (&worker->write_lock);
+
+ /* if we want to close the connection, that takes precedence */
+ if (worker->pending_close_attempts != NULL)
+ {
+ worker->output_pending = TRUE;
+
+ g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
+ NULL, iostream_close_cb,
+ _g_dbus_worker_ref (worker));
+ data = NULL;
+ }
+ else
+ {
+ data = g_queue_pop_head (worker->write_queue);
+
+ if (data != NULL)
+ worker->output_pending = TRUE;
+ }
+
+ g_mutex_unlock (&worker->write_lock);
/* Note that write_lock is only used for protecting the @write_queue
- * and @num_writes_pending fields of the GDBusWorker struct ... which we
+ * and @output_pending fields of the GDBusWorker struct ... which we
* need to modify from arbitrary threads in _g_dbus_worker_send_message().
*
* Therefore, it's fine to drop it here when calling back into user
else if (data->message == NULL)
{
/* filters dropped message */
- g_mutex_lock (worker->write_lock);
- worker->num_writes_pending -= 1;
- g_mutex_unlock (worker->write_lock);
+ g_mutex_lock (&worker->write_lock);
+ worker->output_pending = FALSE;
+ g_mutex_unlock (&worker->write_lock);
message_to_write_data_free (data);
goto write_next;
}
}
}
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is not held on entry
+ * output_pending may be true or false
+ */
static gboolean
write_message_in_idle_cb (gpointer user_data)
{
GDBusWorker *worker = user_data;
- if (worker->num_writes_pending == 0 && !worker->flush_pending)
+
+ /* Because this is the worker thread, we can read this struct member
+ * without holding the lock: no other thread ever modifies it.
+ */
+ if (!worker->output_pending)
maybe_write_next_message (worker);
+
return FALSE;
}
+/*
+ * @write_data: (transfer full) (allow-none):
+ * @close_data: (transfer full) (allow-none):
+ *
+ * Can be called from any thread
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
+static void
+schedule_write_in_worker_thread (GDBusWorker *worker,
+ MessageToWriteData *write_data,
+ CloseData *close_data)
+{
+ g_mutex_lock (&worker->write_lock);
+
+ if (write_data != NULL)
+ g_queue_push_tail (worker->write_queue, write_data);
+
+ if (close_data != NULL)
+ worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
+ close_data);
+
+ if (!worker->output_pending)
+ {
+ GSource *idle_source;
+ idle_source = g_idle_source_new ();
+ g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (idle_source,
+ write_message_in_idle_cb,
+ _g_dbus_worker_ref (worker),
+ (GDestroyNotify) _g_dbus_worker_unref);
+ g_source_attach (idle_source, worker->shared_thread_data->context);
+ g_source_unref (idle_source);
+ }
+
+ g_mutex_unlock (&worker->write_lock);
+}
+
/* ---------------------------------------------------------------------------------------------------- */
-/* can be called from any thread - steals blob */
+/* can be called from any thread - steals blob
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
void
_g_dbus_worker_send_message (GDBusWorker *worker,
GDBusMessage *message,
data->blob = blob; /* steal! */
data->blob_size = blob_len;
- g_mutex_lock (worker->write_lock);
- g_queue_push_tail (worker->write_queue, data);
- if (worker->num_writes_pending == 0)
- {
- GSource *idle_source;
- idle_source = g_idle_source_new ();
- g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
- g_source_set_callback (idle_source,
- write_message_in_idle_cb,
- _g_dbus_worker_ref (worker),
- (GDestroyNotify) _g_dbus_worker_unref);
- g_source_attach (idle_source, shared_thread_data->context);
- g_source_unref (idle_source);
- }
- g_mutex_unlock (worker->write_lock);
+ schedule_write_in_worker_thread (worker, data, NULL);
}
/* ---------------------------------------------------------------------------------------------------- */
-static void
-_g_dbus_worker_thread_begin_func (gpointer user_data)
-{
- GDBusWorker *worker = user_data;
-
- worker->thread = g_thread_self ();
-
- /* begin reading */
- _g_dbus_worker_do_read (worker);
-}
-
GDBusWorker *
_g_dbus_worker_new (GIOStream *stream,
GDBusCapabilityFlags capabilities,
gpointer user_data)
{
GDBusWorker *worker;
+ GSource *idle_source;
g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
g_return_val_if_fail (message_received_callback != NULL, NULL);
worker = g_new0 (GDBusWorker, 1);
worker->ref_count = 1;
- worker->read_lock = g_mutex_new ();
+ g_mutex_init (&worker->read_lock);
worker->message_received_callback = message_received_callback;
worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
worker->disconnected_callback = disconnected_callback;
worker->stream = g_object_ref (stream);
worker->capabilities = capabilities;
worker->cancellable = g_cancellable_new ();
- worker->flush_pending = FALSE;
+ worker->output_pending = FALSE;
worker->frozen = initially_frozen;
worker->received_messages_while_frozen = g_queue_new ();
- worker->write_lock = g_mutex_new ();
+ g_mutex_init (&worker->write_lock);
worker->write_queue = g_queue_new ();
if (G_IS_SOCKET_CONNECTION (worker->stream))
worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
- _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker);
+ worker->shared_thread_data = _g_dbus_shared_thread_ref ();
+
+ /* begin reading */
+ idle_source = g_idle_source_new ();
+ g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (idle_source,
+ _g_dbus_worker_do_initial_read,
+ _g_dbus_worker_ref (worker),
+ (GDestroyNotify) _g_dbus_worker_unref);
+ g_source_attach (idle_source, worker->shared_thread_data->context);
+ g_source_unref (idle_source);
return worker;
}
/* ---------------------------------------------------------------------------------------------------- */
+/* can be called from any thread
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
+ */
+void
+_g_dbus_worker_close (GDBusWorker *worker,
+ GCancellable *cancellable,
+ GSimpleAsyncResult *result)
+{
+ CloseData *close_data;
+
+ close_data = g_slice_new0 (CloseData);
+ close_data->worker = _g_dbus_worker_ref (worker);
+ close_data->cancellable =
+ (cancellable == NULL ? NULL : g_object_ref (cancellable));
+ close_data->result = (result == NULL ? NULL : g_object_ref (result));
+
+ g_cancellable_cancel (worker->cancellable);
+ schedule_write_in_worker_thread (worker, NULL, close_data);
+}
+
/* This can be called from any thread - frees worker. Note that
* callbacks might still happen if called from another thread than the
* worker - use your own synchronization primitive in the callbacks.
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
*/
void
_g_dbus_worker_stop (GDBusWorker *worker)
{
- worker->stopped = TRUE;
- g_cancellable_cancel (worker->cancellable);
+ g_atomic_int_set (&worker->stopped, TRUE);
+
+ /* Cancel any pending operations and schedule a close of the underlying I/O
+ * stream in the worker thread
+ */
+ _g_dbus_worker_close (worker, NULL, NULL);
+
+ /* _g_dbus_worker_close holds a ref until after an idle in the the worker
+ * thread has run, so we no longer need to unref in an idle like in
+ * commit 322e25b535
+ */
_g_dbus_worker_unref (worker);
}
/* can be called from any thread (except the worker thread) - blocks
* calling thread until all queued outgoing messages are written and
* the transport has been flushed
+ *
+ * write_lock is not held on entry
+ * output_pending may be true or false
*/
gboolean
_g_dbus_worker_flush_sync (GDBusWorker *worker,
ret = TRUE;
/* if the queue is empty, there's nothing to wait for */
- g_mutex_lock (worker->write_lock);
+ g_mutex_lock (&worker->write_lock);
if (g_queue_get_length (worker->write_queue) > 0)
{
data = g_new0 (FlushData, 1);
- data->mutex = g_mutex_new ();
- data->cond = g_cond_new ();
+ g_mutex_init (&data->mutex);
+ g_cond_init (&data->cond);
data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
- g_mutex_lock (data->mutex);
+ g_mutex_lock (&data->mutex);
worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
}
- g_mutex_unlock (worker->write_lock);
+ g_mutex_unlock (&worker->write_lock);
if (data != NULL)
{
- g_cond_wait (data->cond, data->mutex);
- g_mutex_unlock (data->mutex);
+ g_cond_wait (&data->cond, &data->mutex);
+ g_mutex_unlock (&data->mutex);
/* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
- g_cond_free (data->cond);
- g_mutex_free (data->mutex);
+ g_cond_clear (&data->cond);
+ g_mutex_clear (&data->mutex);
if (data->error != NULL)
{
ret = FALSE;
const gchar *debug;
g_dbus_error_domain = G_DBUS_ERROR;
+ (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
debug = g_getenv ("G_DBUS_DEBUG");
if (debug != NULL)
out:
;
}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+gboolean
+_g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
+ GValue *return_accu,
+ const GValue *handler_return,
+ gpointer dummy)
+{
+ gboolean continue_emission;
+ gboolean signal_return;
+
+ signal_return = g_value_get_boolean (handler_return);
+ g_value_set_boolean (return_accu, signal_return);
+ continue_emission = signal_return;
+
+ return continue_emission;
+}