X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gio%2Fgdbusprivate.c;h=5584d518d37e4dca4809ef01445e9f13b18e87bb;hb=0d1a92ca3d234a4291ef3ecbf7df2d57442a63e5;hp=442d5e1409b427eb0a266cccdd3c0379dda5ba57;hpb=f0b04acfd31b768151a88db3f8d3347f55b2a7b3;p=platform%2Fupstream%2Fglib.git diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c index 442d5e1..5584d51 100644 --- a/gio/gdbusprivate.c +++ b/gio/gdbusprivate.c @@ -55,6 +55,8 @@ #include "glibintl.h" +static gboolean _g_dbus_worker_do_initial_read (gpointer data); + /* ---------------------------------------------------------------------------------------------------- */ gchar * @@ -153,8 +155,7 @@ _g_socket_read_with_control_messages_ready (GSocket *socket, else { g_assert (error != NULL); - g_simple_async_result_set_from_error (data->simple, error); - g_error_free (error); + g_simple_async_result_take_error (data->simple, error); } if (data->from_mainloop) @@ -241,7 +242,7 @@ ensure_type (GType gtype) } static void -released_required_types (void) +release_required_types (void) { g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL); g_ptr_array_unref (ensured_classes); @@ -258,131 +259,76 @@ ensure_required_types (void) } /* ---------------------------------------------------------------------------------------------------- */ -G_LOCK_DEFINE_STATIC (shared_thread_lock); - typedef struct { - gint num_users; + volatile gint refcount; GThread *thread; GMainContext *context; GMainLoop *loop; } SharedThreadData; -static SharedThreadData *shared_thread_data = NULL; - static gpointer -gdbus_shared_thread_func (gpointer data) +gdbus_shared_thread_func (gpointer user_data) { - g_main_context_push_thread_default (shared_thread_data->context); - g_main_loop_run (shared_thread_data->loop); - g_main_context_pop_thread_default (shared_thread_data->context); - return NULL; -} + SharedThreadData *data = user_data; -typedef void (*GDBusSharedThreadFunc) (gpointer user_data); + g_main_context_push_thread_default (data->context); + g_main_loop_run (data->loop); + g_main_context_pop_thread_default (data->context); -typedef struct -{ - GDBusSharedThreadFunc func; - gpointer user_data; - gboolean done; -} CallerData; + release_required_types (); -static gboolean -invoke_caller (gpointer user_data) -{ - CallerData *data = user_data; - data->func (data->user_data); - data->done = TRUE; - return FALSE; + return NULL; } /* ---------------------------------------------------------------------------------------------------- */ -static void -_g_dbus_shared_thread_ref (GDBusSharedThreadFunc func, - gpointer user_data) +static SharedThreadData * +_g_dbus_shared_thread_ref (void) { - GError *error; - GSource *idle_source; - CallerData *data; - gboolean release_types; + static gsize shared_thread_data = 0; + GError *error = NULL; + SharedThreadData *ret; - G_LOCK (shared_thread_lock); - - release_types = FALSE; - - if (shared_thread_data != NULL) + if (g_once_init_enter (&shared_thread_data)) { - shared_thread_data->num_users += 1; - goto have_thread; + SharedThreadData *data; + + /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */ + ensure_required_types (); + + data = g_new0 (SharedThreadData, 1); + data->refcount = 0; + + data->context = g_main_context_new (); + data->loop = g_main_loop_new (data->context, FALSE); + data->thread = g_thread_new ("gdbus", + gdbus_shared_thread_func, + data, + TRUE, + &error); + g_assert_no_error (error); + /* We can cast between gsize and gpointer safely */ + g_once_init_leave (&shared_thread_data, (gsize) data); } - shared_thread_data = g_new0 (SharedThreadData, 1); - shared_thread_data->num_users = 1; - - /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */ - ensure_required_types (); - release_types = TRUE; - - error = NULL; - shared_thread_data->context = g_main_context_new (); - shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE); - shared_thread_data->thread = g_thread_create (gdbus_shared_thread_func, - NULL, - TRUE, - &error); - g_assert_no_error (error); - - have_thread: - - data = g_new0 (CallerData, 1); - data->func = func; - data->user_data = user_data; - data->done = FALSE; - - idle_source = g_idle_source_new (); - g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); - g_source_set_callback (idle_source, - invoke_caller, - data, - NULL); - g_source_attach (idle_source, shared_thread_data->context); - g_source_unref (idle_source); - - /* wait for the user code to run.. hmm.. probably use a condition variable instead */ - while (!data->done) - g_thread_yield (); - - if (release_types) - released_required_types (); - - g_free (data); - - G_UNLOCK (shared_thread_lock); + ret = (SharedThreadData*) shared_thread_data; + g_atomic_int_inc (&ret->refcount); + return ret; } static void -_g_dbus_shared_thread_unref (void) +_g_dbus_shared_thread_unref (SharedThreadData *data) { /* TODO: actually destroy the shared thread here */ #if 0 - G_LOCK (shared_thread_lock); - g_assert (shared_thread_data != NULL); - shared_thread_data->num_users -= 1; - if (shared_thread_data->num_users == 0) + g_assert (data != NULL); + if (g_atomic_int_dec_and_test (&data->refcount)) { - g_main_loop_quit (shared_thread_data->loop); - //g_thread_join (shared_thread_data->thread); - g_main_loop_unref (shared_thread_data->loop); - g_main_context_unref (shared_thread_data->context); - g_free (shared_thread_data); - shared_thread_data = NULL; - G_UNLOCK (shared_thread_lock); - } - else - { - G_UNLOCK (shared_thread_lock); + g_main_loop_quit (data->loop); + //g_thread_join (data->thread); + g_main_loop_unref (data->loop); + g_main_context_unref (data->context); } #endif } @@ -393,25 +339,26 @@ struct GDBusWorker { volatile gint ref_count; - gboolean stopped; + SharedThreadData *shared_thread_data; + + /* really a boolean, but GLib 2.28 lacks atomic boolean ops */ + volatile gint stopped; /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently * only affects messages received from the other peer (since GDBusServer is the * only user) - we might want it to affect messages sent to the other peer too? */ gboolean frozen; + GDBusCapabilityFlags capabilities; GQueue *received_messages_while_frozen; GIOStream *stream; - GDBusCapabilityFlags capabilities; GCancellable *cancellable; GDBusWorkerMessageReceivedCallback message_received_callback; GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback; GDBusWorkerDisconnectedCallback disconnected_callback; gpointer user_data; - GThread *thread; - /* if not NULL, stream is GSocketConnection */ GSocket *socket; @@ -425,14 +372,26 @@ struct GDBusWorker GSocketControlMessage **read_ancillary_messages; gint read_num_ancillary_messages; + /* TRUE if an async write, flush or close is pending. + * Only the worker thread may change its value, and only with the write_lock. + * Other threads may read its value when holding the write_lock. + * The worker thread may read its value at any time. + */ + gboolean output_pending; /* used for writing */ GMutex *write_lock; + /* queue of MessageToWriteData, protected by write_lock */ GQueue *write_queue; - gint num_writes_pending; + /* protected by write_lock */ guint64 write_num_messages_written; + /* list of FlushData, protected by write_lock */ GList *write_pending_flushes; + /* list of CloseData, protected by write_lock */ + GList *pending_close_attempts; }; +static void _g_dbus_worker_unref (GDBusWorker *worker); + /* ---------------------------------------------------------------------------------------------------- */ typedef struct @@ -454,6 +413,24 @@ static void read_message_print_transport_debug (gssize bytes_read, static void write_message_print_transport_debug (gssize bytes_written, MessageToWriteData *data); +typedef struct { + GDBusWorker *worker; + GCancellable *cancellable; + GSimpleAsyncResult *result; +} CloseData; + +static void close_data_free (CloseData *close_data) +{ + if (close_data->cancellable != NULL) + g_object_unref (close_data->cancellable); + + if (close_data->result != NULL) + g_object_unref (close_data->result); + + _g_dbus_worker_unref (close_data->worker); + g_slice_free (CloseData, close_data); +} + /* ---------------------------------------------------------------------------------------------------- */ static GDBusWorker * @@ -470,7 +447,7 @@ _g_dbus_worker_unref (GDBusWorker *worker) { g_assert (worker->write_pending_flushes == NULL); - _g_dbus_shared_thread_unref (); + _g_dbus_shared_thread_unref (worker->shared_thread_data); g_object_unref (worker->stream); @@ -497,7 +474,7 @@ _g_dbus_worker_emit_disconnected (GDBusWorker *worker, gboolean remote_peer_vanished, GError *error) { - if (!worker->stopped) + if (!g_atomic_int_get (&worker->stopped)) worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data); } @@ -505,7 +482,7 @@ static void _g_dbus_worker_emit_message_received (GDBusWorker *worker, GDBusMessage *message) { - if (!worker->stopped) + if (!g_atomic_int_get (&worker->stopped)) worker->message_received_callback (worker, message, worker->user_data); } @@ -514,7 +491,7 @@ _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker, GDBusMessage *message) { GDBusMessage *ret; - if (!worker->stopped) + if (!g_atomic_int_get (&worker->stopped)) ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data); else ret = message; @@ -575,7 +552,7 @@ _g_dbus_worker_unfreeze (GDBusWorker *worker) unfreeze_in_idle_cb, _g_dbus_worker_ref (worker), (GDestroyNotify) _g_dbus_worker_unref); - g_source_attach (idle_source, shared_thread_data->context); + g_source_attach (idle_source, worker->shared_thread_data->context); g_source_unref (idle_source); } @@ -596,7 +573,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, g_mutex_lock (worker->read_lock); /* If already stopped, don't even process the reply */ - if (worker->stopped) + if (g_atomic_int_get (&worker->stopped)) goto out; error = NULL; @@ -850,12 +827,14 @@ _g_dbus_worker_do_read_unlocked (GDBusWorker *worker) } /* called in private thread shared by all GDBusConnection instances (without read-lock held) */ -static void -_g_dbus_worker_do_read (GDBusWorker *worker) +static gboolean +_g_dbus_worker_do_initial_read (gpointer data) { + GDBusWorker *worker = data; g_mutex_lock (worker->read_lock); _g_dbus_worker_do_read_unlocked (worker); g_mutex_unlock (worker->read_lock); + return FALSE; } /* ---------------------------------------------------------------------------------------------------- */ @@ -876,7 +855,8 @@ static void message_to_write_data_free (MessageToWriteData *data) { _g_dbus_worker_unref (data->worker); - g_object_unref (data->message); + if (data->message) + g_object_unref (data->message); g_free (data->blob); g_free (data); } @@ -885,7 +865,11 @@ message_to_write_data_free (MessageToWriteData *data) static void write_message_continue_writing (MessageToWriteData *data); -/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is true on entry + */ static void write_message_async_cb (GObject *source_object, GAsyncResult *res, @@ -907,8 +891,7 @@ write_message_async_cb (GObject *source_object, &error); if (bytes_written == -1) { - g_simple_async_result_set_from_error (simple, error); - g_error_free (error); + g_simple_async_result_take_error (simple, error); g_simple_async_result_complete (simple); g_object_unref (simple); goto out; @@ -932,7 +915,11 @@ write_message_async_cb (GObject *source_object, ; } -/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is true on entry + */ static gboolean on_socket_ready (GSocket *socket, GIOCondition condition, @@ -943,7 +930,11 @@ on_socket_ready (GSocket *socket, return FALSE; /* remove source */ } -/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is true on entry + */ static void write_message_continue_writing (MessageToWriteData *data) { @@ -1027,8 +1018,7 @@ write_message_continue_writing (MessageToWriteData *data) g_error_free (error); goto out; } - g_simple_async_result_set_from_error (simple, error); - g_error_free (error); + g_simple_async_result_take_error (simple, error); g_simple_async_result_complete (simple); g_object_unref (simple); goto out; @@ -1077,7 +1067,11 @@ write_message_continue_writing (MessageToWriteData *data) ; } -/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is true on entry + */ static void write_message_async (GDBusWorker *worker, MessageToWriteData *data, @@ -1113,7 +1107,29 @@ typedef struct GList *flushers; } FlushAsyncData; -/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +static void +flush_data_list_complete (const GList *flushers, + const GError *error) +{ + const GList *l; + + for (l = flushers; l != NULL; l = l->next) + { + FlushData *f = l->data; + + f->error = error != NULL ? g_error_copy (error) : NULL; + + g_mutex_lock (f->mutex); + g_cond_signal (f->cond); + g_mutex_unlock (f->mutex); + } +} + +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is true on entry + */ static void ostream_flush_cb (GObject *source_object, GAsyncResult *res, @@ -1121,7 +1137,6 @@ ostream_flush_cb (GObject *source_object, { FlushAsyncData *data = user_data; GError *error; - GList *l; error = NULL; g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object), @@ -1142,21 +1157,19 @@ ostream_flush_cb (GObject *source_object, } g_assert (data->flushers != NULL); - for (l = data->flushers; l != NULL; l = l->next) - { - FlushData *f = l->data; - - f->error = error != NULL ? g_error_copy (error) : NULL; - - g_mutex_lock (f->mutex); - g_cond_signal (f->cond); - g_mutex_unlock (f->mutex); - } + flush_data_list_complete (data->flushers, error); g_list_free (data->flushers); if (error != NULL) g_error_free (error); + /* Make sure we tell folks that we don't have additional + flushes pending */ + g_mutex_lock (data->worker->write_lock); + g_assert (data->worker->output_pending); + data->worker->output_pending = FALSE; + g_mutex_unlock (data->worker->write_lock); + /* OK, cool, finally kick off the next write */ maybe_write_next_message (data->worker); @@ -1164,7 +1177,11 @@ ostream_flush_cb (GObject *source_object, g_free (data); } -/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is false on entry + */ static void message_written (GDBusWorker *worker, MessageToWriteData *message_data) @@ -1209,6 +1226,11 @@ message_written (GDBusWorker *worker, worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l); } } + if (flushers != NULL) + { + g_assert (!worker->output_pending); + worker->output_pending = TRUE; + } g_mutex_unlock (worker->write_lock); if (flushers != NULL) @@ -1231,7 +1253,11 @@ message_written (GDBusWorker *worker, } } -/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is true on entry + */ static void write_message_cb (GObject *source_object, GAsyncResult *res, @@ -1241,7 +1267,8 @@ write_message_cb (GObject *source_object, GError *error; g_mutex_lock (data->worker->write_lock); - data->worker->num_writes_pending -= 1; + g_assert (data->worker->output_pending); + data->worker->output_pending = FALSE; g_mutex_unlock (data->worker->write_lock); error = NULL; @@ -1261,22 +1288,114 @@ write_message_cb (GObject *source_object, message_to_write_data_free (data); } -/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is true on entry + */ +static void +iostream_close_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GDBusWorker *worker = user_data; + GError *error = NULL; + GList *pending_close_attempts, *pending_flush_attempts; + GQueue *send_queue; + + g_io_stream_close_finish (worker->stream, res, &error); + + g_mutex_lock (worker->write_lock); + + pending_close_attempts = worker->pending_close_attempts; + worker->pending_close_attempts = NULL; + + pending_flush_attempts = worker->write_pending_flushes; + worker->write_pending_flushes = NULL; + + send_queue = worker->write_queue; + worker->write_queue = g_queue_new (); + + g_assert (worker->output_pending); + worker->output_pending = FALSE; + + g_mutex_unlock (worker->write_lock); + + while (pending_close_attempts != NULL) + { + CloseData *close_data = pending_close_attempts->data; + + pending_close_attempts = g_list_delete_link (pending_close_attempts, + pending_close_attempts); + + if (close_data->result != NULL) + { + if (error != NULL) + g_simple_async_result_set_from_error (close_data->result, error); + + /* this must be in an idle because the result is likely to be + * intended for another thread + */ + g_simple_async_result_complete_in_idle (close_data->result); + } + + close_data_free (close_data); + } + + g_clear_error (&error); + + /* all messages queued for sending are discarded */ + g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL); + g_queue_free (send_queue); + + /* all queued flushes fail */ + error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED, + _("Operation was cancelled")); + flush_data_list_complete (pending_flush_attempts, error); + g_list_free (pending_flush_attempts); + g_clear_error (&error); + + _g_dbus_worker_unref (worker); +} + +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending must be false on entry + */ static void maybe_write_next_message (GDBusWorker *worker) { MessageToWriteData *data; write_next: + /* we mustn't try to write two things at once */ + g_assert (!worker->output_pending); g_mutex_lock (worker->write_lock); - data = g_queue_pop_head (worker->write_queue); - if (data != NULL) - worker->num_writes_pending += 1; + + /* if we want to close the connection, that takes precedence */ + if (worker->pending_close_attempts != NULL) + { + worker->output_pending = TRUE; + + g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT, + NULL, iostream_close_cb, + _g_dbus_worker_ref (worker)); + data = NULL; + } + else + { + data = g_queue_pop_head (worker->write_queue); + + if (data != NULL) + worker->output_pending = TRUE; + } + g_mutex_unlock (worker->write_lock); /* Note that write_lock is only used for protecting the @write_queue - * and @num_writes_pending fields of the GDBusWorker struct ... which we + * and @output_pending fields of the GDBusWorker struct ... which we * need to modify from arbitrary threads in _g_dbus_worker_send_message(). * * Therefore, it's fine to drop it here when calling back into user @@ -1300,7 +1419,7 @@ maybe_write_next_message (GDBusWorker *worker) { /* filters dropped message */ g_mutex_lock (worker->write_lock); - worker->num_writes_pending -= 1; + worker->output_pending = FALSE; g_mutex_unlock (worker->write_lock); message_to_write_data_free (data); goto write_next; @@ -1338,40 +1457,49 @@ maybe_write_next_message (GDBusWorker *worker) } } -/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending may be true or false + */ static gboolean write_message_in_idle_cb (gpointer user_data) { GDBusWorker *worker = user_data; - if (worker->num_writes_pending == 0) + + /* Because this is the worker thread, we can read this struct member + * without holding the lock: no other thread ever modifies it. + */ + if (!worker->output_pending) maybe_write_next_message (worker); + return FALSE; } -/* ---------------------------------------------------------------------------------------------------- */ - -/* can be called from any thread - steals blob */ -void -_g_dbus_worker_send_message (GDBusWorker *worker, - GDBusMessage *message, - gchar *blob, - gsize blob_len) +/* + * @write_data: (transfer full) (allow-none): + * @close_data: (transfer full) (allow-none): + * + * Can be called from any thread + * + * write_lock is not held on entry + * output_pending may be true or false + */ +static void +schedule_write_in_worker_thread (GDBusWorker *worker, + MessageToWriteData *write_data, + CloseData *close_data) { - MessageToWriteData *data; + g_mutex_lock (worker->write_lock); - g_return_if_fail (G_IS_DBUS_MESSAGE (message)); - g_return_if_fail (blob != NULL); - g_return_if_fail (blob_len > 16); + if (write_data != NULL) + g_queue_push_tail (worker->write_queue, write_data); - data = g_new0 (MessageToWriteData, 1); - data->worker = _g_dbus_worker_ref (worker); - data->message = g_object_ref (message); - data->blob = blob; /* steal! */ - data->blob_size = blob_len; + if (close_data != NULL) + worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts, + close_data); - g_mutex_lock (worker->write_lock); - g_queue_push_tail (worker->write_queue, data); - if (worker->num_writes_pending == 0) + if (!worker->output_pending) { GSource *idle_source; idle_source = g_idle_source_new (); @@ -1380,25 +1508,43 @@ _g_dbus_worker_send_message (GDBusWorker *worker, write_message_in_idle_cb, _g_dbus_worker_ref (worker), (GDestroyNotify) _g_dbus_worker_unref); - g_source_attach (idle_source, shared_thread_data->context); + g_source_attach (idle_source, worker->shared_thread_data->context); g_source_unref (idle_source); } + g_mutex_unlock (worker->write_lock); } /* ---------------------------------------------------------------------------------------------------- */ -static void -_g_dbus_worker_thread_begin_func (gpointer user_data) +/* can be called from any thread - steals blob + * + * write_lock is not held on entry + * output_pending may be true or false + */ +void +_g_dbus_worker_send_message (GDBusWorker *worker, + GDBusMessage *message, + gchar *blob, + gsize blob_len) { - GDBusWorker *worker = user_data; + MessageToWriteData *data; - worker->thread = g_thread_self (); + g_return_if_fail (G_IS_DBUS_MESSAGE (message)); + g_return_if_fail (blob != NULL); + g_return_if_fail (blob_len > 16); - /* begin reading */ - _g_dbus_worker_do_read (worker); + data = g_new0 (MessageToWriteData, 1); + data->worker = _g_dbus_worker_ref (worker); + data->message = g_object_ref (message); + data->blob = blob; /* steal! */ + data->blob_size = blob_len; + + schedule_write_in_worker_thread (worker, data, NULL); } +/* ---------------------------------------------------------------------------------------------------- */ + GDBusWorker * _g_dbus_worker_new (GIOStream *stream, GDBusCapabilityFlags capabilities, @@ -1409,6 +1555,7 @@ _g_dbus_worker_new (GIOStream *stream, gpointer user_data) { GDBusWorker *worker; + GSource *idle_source; g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL); g_return_val_if_fail (message_received_callback != NULL, NULL); @@ -1426,6 +1573,7 @@ _g_dbus_worker_new (GIOStream *stream, worker->stream = g_object_ref (stream); worker->capabilities = capabilities; worker->cancellable = g_cancellable_new (); + worker->output_pending = FALSE; worker->frozen = initially_frozen; worker->received_messages_while_frozen = g_queue_new (); @@ -1436,22 +1584,66 @@ _g_dbus_worker_new (GIOStream *stream, if (G_IS_SOCKET_CONNECTION (worker->stream)) worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)); - _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker); + worker->shared_thread_data = _g_dbus_shared_thread_ref (); + + /* begin reading */ + idle_source = g_idle_source_new (); + g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); + g_source_set_callback (idle_source, + _g_dbus_worker_do_initial_read, + _g_dbus_worker_ref (worker), + (GDestroyNotify) _g_dbus_worker_unref); + g_source_attach (idle_source, worker->shared_thread_data->context); + g_source_unref (idle_source); return worker; } /* ---------------------------------------------------------------------------------------------------- */ +/* can be called from any thread + * + * write_lock is not held on entry + * output_pending may be true or false + */ +void +_g_dbus_worker_close (GDBusWorker *worker, + GCancellable *cancellable, + GSimpleAsyncResult *result) +{ + CloseData *close_data; + + close_data = g_slice_new0 (CloseData); + close_data->worker = _g_dbus_worker_ref (worker); + close_data->cancellable = + (cancellable == NULL ? NULL : g_object_ref (cancellable)); + close_data->result = (result == NULL ? NULL : g_object_ref (result)); + + g_cancellable_cancel (worker->cancellable); + schedule_write_in_worker_thread (worker, NULL, close_data); +} + /* This can be called from any thread - frees worker. Note that * callbacks might still happen if called from another thread than the * worker - use your own synchronization primitive in the callbacks. + * + * write_lock is not held on entry + * output_pending may be true or false */ void _g_dbus_worker_stop (GDBusWorker *worker) { - worker->stopped = TRUE; - g_cancellable_cancel (worker->cancellable); + g_atomic_int_set (&worker->stopped, TRUE); + + /* Cancel any pending operations and schedule a close of the underlying I/O + * stream in the worker thread + */ + _g_dbus_worker_close (worker, NULL, NULL); + + /* _g_dbus_worker_close holds a ref until after an idle in the the worker + * thread has run, so we no longer need to unref in an idle like in + * commit 322e25b535 + */ _g_dbus_worker_unref (worker); } @@ -1460,6 +1652,9 @@ _g_dbus_worker_stop (GDBusWorker *worker) /* can be called from any thread (except the worker thread) - blocks * calling thread until all queued outgoing messages are written and * the transport has been flushed + * + * write_lock is not held on entry + * output_pending may be true or false */ gboolean _g_dbus_worker_flush_sync (GDBusWorker *worker, @@ -1622,6 +1817,7 @@ _g_dbus_initialize (void) const gchar *debug; g_dbus_error_domain = G_DBUS_ERROR; + (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */ debug = g_getenv ("G_DBUS_DEBUG"); if (debug != NULL) @@ -1858,3 +2054,21 @@ read_message_print_transport_debug (gssize bytes_read, out: ; } + +/* ---------------------------------------------------------------------------------------------------- */ + +gboolean +_g_signal_accumulator_false_handled (GSignalInvocationHint *ihint, + GValue *return_accu, + const GValue *handler_return, + gpointer dummy) +{ + gboolean continue_emission; + gboolean signal_return; + + signal_return = g_value_get_boolean (handler_return); + g_value_set_boolean (return_accu, signal_return); + continue_emission = signal_return; + + return continue_emission; +}