X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gio%2Fgdbusprivate.c;h=91feb414e1b8699d51cfa595536e968e269a3a69;hb=51fac05d73f8363de821eb0d6940dedca13a8c0f;hp=d718a39a470efe1e36d225455cbab5f1ba4fc798;hpb=d344ff9d67a7e723a1c2163e7d9254dd6ef049d2;p=platform%2Fupstream%2Fglib.git diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c index d718a39..91feb41 100644 --- a/gio/gdbusprivate.c +++ b/gio/gdbusprivate.c @@ -13,9 +13,7 @@ * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General - * Public License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place, Suite 330, - * Boston, MA 02111-1307, USA. + * Public License along with this library; if not, see . * * Author: David Zeuthen */ @@ -24,9 +22,6 @@ #include #include -#ifdef HAVE_UNISTD_H -#include -#endif #include "giotypes.h" #include "gsocket.h" @@ -37,11 +32,16 @@ #include "gasyncresult.h" #include "gsimpleasyncresult.h" #include "ginputstream.h" +#include "gmemoryinputstream.h" #include "giostream.h" +#include "glib/gstdio.h" #include "gsocketcontrolmessage.h" #include "gsocketconnection.h" +#include "gsocketoutputstream.h" #ifdef G_OS_UNIX +#include "gkdbus.h" +#include "gkdbusconnection.h" #include "gunixfdmessage.h" #include "gunixconnection.h" #include "gunixcredentialsmessage.h" @@ -53,6 +53,8 @@ #include "glibintl.h" +static gboolean _g_dbus_worker_do_initial_read (gpointer data); + /* ---------------------------------------------------------------------------------------------------- */ gchar * @@ -90,6 +92,107 @@ _g_dbus_hexdump (const gchar *data, gsize len, guint indent) /* ---------------------------------------------------------------------------------------------------- */ +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) +typedef struct +{ + GKdbus *kdbus; + GCancellable *cancellable; + + GSimpleAsyncResult *simple; + + gboolean from_mainloop; +} ReadKdbusData; + +static void +read_kdbus_data_free (ReadKdbusData *data) +{ + g_object_unref (data->kdbus); + if (data->cancellable != NULL) + g_object_unref (data->cancellable); + g_object_unref (data->simple); + g_free (data); +} + +static gboolean +_g_kdbus_read_ready (GKdbus *kdbus, + GIOCondition condition, + gpointer user_data) +{ + ReadKdbusData *data = user_data; + GError *error = NULL; + gssize result; + + result = _g_kdbus_receive (data->kdbus, + data->cancellable, + &error); + + if (result >= 0) + { + g_simple_async_result_set_op_res_gssize (data->simple, result); + } + else + { + g_assert (error != NULL); + g_simple_async_result_take_error (data->simple, error); + } + + if (data->from_mainloop) + g_simple_async_result_complete (data->simple); + else + g_simple_async_result_complete_in_idle (data->simple); + + return FALSE; +} + +static void +_g_kdbus_read (GKdbus *kdbus, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + ReadKdbusData *data; + GSource *source; + + data = g_new0 (ReadKdbusData, 1); + data->kdbus = g_object_ref (kdbus); + data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL; + + data->simple = g_simple_async_result_new (G_OBJECT (kdbus), + callback, + user_data, + _g_kdbus_read); + g_simple_async_result_set_check_cancellable (data->simple, cancellable); + + data->from_mainloop = TRUE; + source = _g_kdbus_create_source (data->kdbus, + G_IO_IN, + cancellable); + g_source_set_callback (source, + (GSourceFunc) _g_kdbus_read_ready, + data, + (GDestroyNotify) read_kdbus_data_free); + g_source_attach (source, g_main_context_get_thread_default ()); + g_source_unref (source); +} + +static gssize +_g_kdbus_read_finish (GKdbus *kdbus, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); + + g_return_val_if_fail (G_IS_KDBUS (kdbus), -1); + g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read); + + if (g_simple_async_result_propagate_error (simple, error)) + return -1; + else + return g_simple_async_result_get_op_res_gssize (simple); +} + +#endif /* defined (G_OS_UNIX) && (KDBUS_TRANSPORT) */ + /* Unfortunately ancillary messages are discarded when reading from a * socket using the GSocketInputStream abstraction. So we provide a * very GInputStream-ish API that uses GSocket in this case (very @@ -151,8 +254,7 @@ _g_socket_read_with_control_messages_ready (GSocket *socket, else { g_assert (error != NULL); - g_simple_async_result_set_from_error (data->simple, error); - g_error_free (error); + g_simple_async_result_take_error (data->simple, error); } if (data->from_mainloop) @@ -188,6 +290,7 @@ _g_socket_read_with_control_messages (GSocket *socket, callback, user_data, _g_socket_read_with_control_messages); + g_simple_async_result_set_check_cancellable (data->simple, cancellable); if (!g_socket_condition_check (socket, G_IO_IN)) { @@ -228,153 +331,145 @@ _g_socket_read_with_control_messages_finish (GSocket *socket, /* ---------------------------------------------------------------------------------------------------- */ -G_LOCK_DEFINE_STATIC (shared_thread_lock); +/* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */ + +static GPtrArray *ensured_classes = NULL; + +static void +ensure_type (GType gtype) +{ + g_ptr_array_add (ensured_classes, g_type_class_ref (gtype)); +} + +static void +release_required_types (void) +{ + g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL); + g_ptr_array_unref (ensured_classes); + ensured_classes = NULL; +} + +static void +ensure_required_types (void) +{ + g_assert (ensured_classes == NULL); + ensured_classes = g_ptr_array_new (); + ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT); + ensure_type (G_TYPE_MEMORY_INPUT_STREAM); +} +/* ---------------------------------------------------------------------------------------------------- */ typedef struct { - gint num_users; + volatile gint refcount; GThread *thread; GMainContext *context; GMainLoop *loop; } SharedThreadData; -static SharedThreadData *shared_thread_data = NULL; - static gpointer -shared_thread_func (gpointer data) +gdbus_shared_thread_func (gpointer user_data) { - g_main_context_push_thread_default (shared_thread_data->context); - g_main_loop_run (shared_thread_data->loop); - g_main_context_pop_thread_default (shared_thread_data->context); - return NULL; -} + SharedThreadData *data = user_data; -typedef void (*GDBusSharedThreadFunc) (gpointer user_data); + g_main_context_push_thread_default (data->context); + g_main_loop_run (data->loop); + g_main_context_pop_thread_default (data->context); -typedef struct -{ - GDBusSharedThreadFunc func; - gpointer user_data; - gboolean done; -} CallerData; + release_required_types (); -static gboolean -invoke_caller (gpointer user_data) -{ - CallerData *data = user_data; - data->func (data->user_data); - data->done = TRUE; - return FALSE; + return NULL; } -static void -_g_dbus_shared_thread_ref (GDBusSharedThreadFunc func, - gpointer user_data) -{ - GError *error; - GSource *idle_source; - CallerData *data; +/* ---------------------------------------------------------------------------------------------------- */ - G_LOCK (shared_thread_lock); +static SharedThreadData * +_g_dbus_shared_thread_ref (void) +{ + static gsize shared_thread_data = 0; + SharedThreadData *ret; - if (shared_thread_data != NULL) + if (g_once_init_enter (&shared_thread_data)) { - shared_thread_data->num_users += 1; - goto have_thread; + SharedThreadData *data; + + /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */ + ensure_required_types (); + + data = g_new0 (SharedThreadData, 1); + data->refcount = 0; + + data->context = g_main_context_new (); + data->loop = g_main_loop_new (data->context, FALSE); + data->thread = g_thread_new ("gdbus", + gdbus_shared_thread_func, + data); + /* We can cast between gsize and gpointer safely */ + g_once_init_leave (&shared_thread_data, (gsize) data); } - shared_thread_data = g_new0 (SharedThreadData, 1); - shared_thread_data->num_users = 1; - - error = NULL; - shared_thread_data->context = g_main_context_new (); - shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE); - shared_thread_data->thread = g_thread_create (shared_thread_func, - NULL, - TRUE, - &error); - g_assert_no_error (error); - - have_thread: - - data = g_new0 (CallerData, 1); - data->func = func; - data->user_data = user_data; - data->done = FALSE; - - idle_source = g_idle_source_new (); - g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); - g_source_set_callback (idle_source, - invoke_caller, - data, - NULL); - g_source_attach (idle_source, shared_thread_data->context); - g_source_unref (idle_source); - - /* wait for the user code to run.. hmm.. probably use a condition variable instead */ - while (!data->done) - g_thread_yield (); - - g_free (data); - - G_UNLOCK (shared_thread_lock); + ret = (SharedThreadData*) shared_thread_data; + g_atomic_int_inc (&ret->refcount); + return ret; } static void -_g_dbus_shared_thread_unref (void) +_g_dbus_shared_thread_unref (SharedThreadData *data) { /* TODO: actually destroy the shared thread here */ #if 0 - G_LOCK (shared_thread_lock); - g_assert (shared_thread_data != NULL); - shared_thread_data->num_users -= 1; - if (shared_thread_data->num_users == 0) - { - g_main_loop_quit (shared_thread_data->loop); - //g_thread_join (shared_thread_data->thread); - g_main_loop_unref (shared_thread_data->loop); - g_main_context_unref (shared_thread_data->context); - g_free (shared_thread_data); - shared_thread_data = NULL; - G_UNLOCK (shared_thread_lock); - } - else + g_assert (data != NULL); + if (g_atomic_int_dec_and_test (&data->refcount)) { - G_UNLOCK (shared_thread_lock); + g_main_loop_quit (data->loop); + //g_thread_join (data->thread); + g_main_loop_unref (data->loop); + g_main_context_unref (data->context); } #endif } /* ---------------------------------------------------------------------------------------------------- */ +typedef enum { + PENDING_NONE = 0, + PENDING_WRITE, + PENDING_FLUSH, + PENDING_CLOSE +} OutputPending; + struct GDBusWorker { volatile gint ref_count; - gboolean stopped; + SharedThreadData *shared_thread_data; + + /* really a boolean, but GLib 2.28 lacks atomic boolean ops */ + volatile gint stopped; /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently * only affects messages received from the other peer (since GDBusServer is the * only user) - we might want it to affect messages sent to the other peer too? */ gboolean frozen; + GDBusCapabilityFlags capabilities; GQueue *received_messages_while_frozen; GIOStream *stream; - GDBusCapabilityFlags capabilities; GCancellable *cancellable; GDBusWorkerMessageReceivedCallback message_received_callback; GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback; GDBusWorkerDisconnectedCallback disconnected_callback; gpointer user_data; - GThread *thread; - - /* if not NULL, stream is GSocketConnection */ + /* if GSocket and GKdbus are NULL, stream is GSocketConnection */ GSocket *socket; +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + GKdbus *kdbus; +#endif /* used for reading */ - GMutex *read_lock; + GMutex read_lock; gchar *read_buffer; gsize read_buffer_allocated_size; gsize read_buffer_cur_size; @@ -383,19 +478,40 @@ struct GDBusWorker GSocketControlMessage **read_ancillary_messages; gint read_num_ancillary_messages; + /* Whether an async write, flush or close, or none of those, is pending. + * Only the worker thread may change its value, and only with the write_lock. + * Other threads may read its value when holding the write_lock. + * The worker thread may read its value at any time. + */ + OutputPending output_pending; /* used for writing */ - GMutex *write_lock; + GMutex write_lock; + /* queue of MessageToWriteData, protected by write_lock */ GQueue *write_queue; - gboolean write_is_pending; + /* protected by write_lock */ guint64 write_num_messages_written; + /* number of messages we'd written out last time we flushed; + * protected by write_lock + */ + guint64 write_num_messages_flushed; + /* list of FlushData, protected by write_lock */ GList *write_pending_flushes; + /* list of CloseData, protected by write_lock */ + GList *pending_close_attempts; + /* no lock - only used from the worker thread */ + gboolean close_expected; }; +static void _g_dbus_worker_unref (GDBusWorker *worker); + +/* ---------------------------------------------------------------------------------------------------- */ + typedef struct { - GMutex *mutex; - GCond *cond; + GMutex mutex; + GCond cond; guint64 number_to_wait_for; + GError *error; } FlushData; struct _MessageToWriteData ; @@ -403,6 +519,32 @@ typedef struct _MessageToWriteData MessageToWriteData; static void message_to_write_data_free (MessageToWriteData *data); +static void read_message_print_transport_debug (gssize bytes_read, + GDBusWorker *worker); + +static void write_message_print_transport_debug (gssize bytes_written, + MessageToWriteData *data); + +typedef struct { + GDBusWorker *worker; + GCancellable *cancellable; + GSimpleAsyncResult *result; +} CloseData; + +static void close_data_free (CloseData *close_data) +{ + if (close_data->cancellable != NULL) + g_object_unref (close_data->cancellable); + + if (close_data->result != NULL) + g_object_unref (close_data->result); + + _g_dbus_worker_unref (close_data->worker); + g_slice_free (CloseData, close_data); +} + +/* ---------------------------------------------------------------------------------------------------- */ + static GDBusWorker * _g_dbus_worker_ref (GDBusWorker *worker) { @@ -417,21 +559,19 @@ _g_dbus_worker_unref (GDBusWorker *worker) { g_assert (worker->write_pending_flushes == NULL); - _g_dbus_shared_thread_unref (); + _g_dbus_shared_thread_unref (worker->shared_thread_data); g_object_unref (worker->stream); - g_mutex_free (worker->read_lock); + g_mutex_clear (&worker->read_lock); g_object_unref (worker->cancellable); if (worker->read_fd_list != NULL) g_object_unref (worker->read_fd_list); - g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL); - g_queue_free (worker->received_messages_while_frozen); - - g_mutex_free (worker->write_lock); - g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL); - g_queue_free (worker->write_queue); + g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref); + g_mutex_clear (&worker->write_lock); + g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free); + g_free (worker->read_buffer); g_free (worker); } @@ -442,7 +582,7 @@ _g_dbus_worker_emit_disconnected (GDBusWorker *worker, gboolean remote_peer_vanished, GError *error) { - if (!worker->stopped) + if (!g_atomic_int_get (&worker->stopped)) worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data); } @@ -450,18 +590,19 @@ static void _g_dbus_worker_emit_message_received (GDBusWorker *worker, GDBusMessage *message) { - if (!worker->stopped) + if (!g_atomic_int_get (&worker->stopped)) worker->message_received_callback (worker, message, worker->user_data); } -static gboolean +static GDBusMessage * _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker, GDBusMessage *message) { - gboolean ret; - ret = FALSE; - if (!worker->stopped) + GDBusMessage *ret; + if (!g_atomic_int_get (&worker->stopped)) ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data); + else + ret = message; return ret; } @@ -490,7 +631,7 @@ unfreeze_in_idle_cb (gpointer user_data) GDBusWorker *worker = user_data; GDBusMessage *message; - g_mutex_lock (worker->read_lock); + g_mutex_lock (&worker->read_lock); if (worker->frozen) { while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL) @@ -504,7 +645,7 @@ unfreeze_in_idle_cb (gpointer user_data) { g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0); } - g_mutex_unlock (worker->read_lock); + g_mutex_unlock (&worker->read_lock); return FALSE; } @@ -519,7 +660,8 @@ _g_dbus_worker_unfreeze (GDBusWorker *worker) unfreeze_in_idle_cb, _g_dbus_worker_ref (worker), (GDestroyNotify) _g_dbus_worker_unref); - g_source_attach (idle_source, shared_thread_data->context); + g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb"); + g_source_attach (idle_source, worker->shared_thread_data->context); g_source_unref (idle_source); } @@ -537,14 +679,28 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, GError *error; gssize bytes_read; - g_mutex_lock (worker->read_lock); + g_mutex_lock (&worker->read_lock); /* If already stopped, don't even process the reply */ - if (worker->stopped) + if (g_atomic_int_get (&worker->stopped)) goto out; error = NULL; - if (worker->socket == NULL) + bytes_read = 0; + + if (FALSE) + { + } +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + else if (G_IS_KDBUS_CONNECTION (worker->stream)) + { + bytes_read = _g_kdbus_read_finish (worker->kdbus, + res, + &error); + g_error ("[KDBUS] _g_dbus_worker_do_read_cb() - work in progress"); + } +#endif + else if (worker->socket == NULL) bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream), res, &error); @@ -582,7 +738,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, { /* TODO: really want a append_steal() */ g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL); - close (fds[n]); + (void) g_close (fds[n], NULL); } } g_free (fds); @@ -618,7 +774,40 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, if (bytes_read == -1) { - _g_dbus_worker_emit_disconnected (worker, TRUE, error); + if (G_UNLIKELY (_g_dbus_debug_transport ())) + { + _g_dbus_debug_print_lock (); + g_print ("========================================================================\n" + "GDBus-debug:Transport:\n" + " ---- READ ERROR on stream of type %s:\n" + " ---- %s %d: %s\n", + g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))), + g_quark_to_string (error->domain), error->code, + error->message); + _g_dbus_debug_print_unlock (); + } + + /* Every async read that uses this callback uses worker->cancellable + * as its GCancellable. worker->cancellable gets cancelled if and only + * if the GDBusConnection tells us to close (either via + * _g_dbus_worker_stop, which is called on last-unref, or directly), + * so a cancelled read must mean our connection was closed locally. + * + * If we're closing, other errors are possible - notably, + * G_IO_ERROR_CLOSED can be seen if we close the stream with an async + * read in-flight. It seems sensible to treat all read errors during + * closing as an expected thing that doesn't trip exit-on-close. + * + * Because close_expected can't be set until we get into the worker + * thread, but the cancellable is signalled sooner (from another + * thread), we do still need to check the error. + */ + if (worker->close_expected || + g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) + _g_dbus_worker_emit_disconnected (worker, FALSE, NULL); + else + _g_dbus_worker_emit_disconnected (worker, TRUE, error); + g_error_free (error); goto out; } @@ -646,6 +835,8 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, goto out; } + read_message_print_transport_debug (bytes_read, worker); + worker->read_buffer_cur_size += bytes_read; if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size) { @@ -660,7 +851,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, &error); if (message_len == -1) { - g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message); + g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message); _g_dbus_worker_emit_disconnected (worker, FALSE, error); g_error_free (error); goto out; @@ -701,6 +892,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, if (worker->read_fd_list != NULL) { g_dbus_message_set_unix_fd_list (message, worker->read_fd_list); + g_object_unref (worker->read_fd_list); worker->read_fd_list = NULL; } #endif @@ -741,7 +933,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, } out: - g_mutex_unlock (worker->read_lock); + g_mutex_unlock (&worker->read_lock); /* gives up the reference acquired when calling g_input_stream_read_async() */ _g_dbus_worker_unref (worker); @@ -751,6 +943,28 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker) { + /* Note that we do need to keep trying to read even if close_expected is + * true, because only failing a read causes us to signal 'closed'. + */ + + /* [KDBUS] + * For KDBUS transport we don't have to alloc buffer (worker->read_buffer) + * instead of it we use kdbus memory pool. On connection stage KDBUS client + * have to register a memory pool, large enough to carry all backlog of + * data enqueued for the connection. + */ + +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + if (G_IS_KDBUS_CONNECTION (worker->stream)) + { + _g_kdbus_read(worker->kdbus, + worker->cancellable, + (GAsyncReadyCallback) _g_dbus_worker_do_read_cb, + _g_dbus_worker_ref (worker)); + return; + } +#endif + /* if bytes_wanted is zero, it means start reading a message */ if (worker->read_buffer_bytes_wanted == 0) { @@ -791,159 +1005,387 @@ _g_dbus_worker_do_read_unlocked (GDBusWorker *worker) } /* called in private thread shared by all GDBusConnection instances (without read-lock held) */ -static void -_g_dbus_worker_do_read (GDBusWorker *worker) +static gboolean +_g_dbus_worker_do_initial_read (gpointer data) { - g_mutex_lock (worker->read_lock); + GDBusWorker *worker = data; + g_mutex_lock (&worker->read_lock); _g_dbus_worker_do_read_unlocked (worker); - g_mutex_unlock (worker->read_lock); + g_mutex_unlock (&worker->read_lock); + return FALSE; } /* ---------------------------------------------------------------------------------------------------- */ struct _MessageToWriteData { + GDBusWorker *worker; GDBusMessage *message; gchar *blob; gsize blob_size; + + gsize total_written; + GSimpleAsyncResult *simple; + }; static void message_to_write_data_free (MessageToWriteData *data) { - g_object_unref (data->message); + _g_dbus_worker_unref (data->worker); + if (data->message) + g_object_unref (data->message); g_free (data->blob); g_free (data); } /* ---------------------------------------------------------------------------------------------------- */ -/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ -static gboolean -write_message (GDBusWorker *worker, - MessageToWriteData *data, - GError **error) +static void write_message_continue_writing (MessageToWriteData *data); + +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is PENDING_WRITE on entry + */ +static void +write_message_async_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) { - gboolean ret; - GList *l; - GList *ll; + MessageToWriteData *data = user_data; + GSimpleAsyncResult *simple; + gssize bytes_written; + GError *error; + + /* Note: we can't access data->simple after calling g_async_result_complete () because the + * callback can free @data and we're not completing in idle. So use a copy of the pointer. + */ + simple = data->simple; + + error = NULL; + bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object), + res, + &error); + if (bytes_written == -1) + { + g_simple_async_result_take_error (simple, error); + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; + } + g_assert (bytes_written > 0); /* zero is never returned */ + + write_message_print_transport_debug (bytes_written, data); + + data->total_written += bytes_written; + g_assert (data->total_written <= data->blob_size); + if (data->total_written == data->blob_size) + { + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; + } + + write_message_continue_writing (data); + + out: + ; +} - g_return_val_if_fail (data->blob_size > 16, FALSE); +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is PENDING_WRITE on entry + */ +#ifdef G_OS_UNIX +static gboolean +on_socket_ready (GSocket *socket, + GIOCondition condition, + gpointer user_data) +{ + MessageToWriteData *data = user_data; + write_message_continue_writing (data); + return FALSE; /* remove source */ +} +#endif - ret = FALSE; +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is PENDING_WRITE on entry + */ +static void +write_message_continue_writing (MessageToWriteData *data) +{ + GOutputStream *ostream; +#ifdef G_OS_UNIX + GSimpleAsyncResult *simple; + GUnixFDList *fd_list; +#endif - /* First, the initial 16 bytes - special case UNIX sockets here - * since it may involve writing an ancillary message with file - * descriptors +#ifdef G_OS_UNIX + /* Note: we can't access data->simple after calling g_async_result_complete () because the + * callback can free @data and we're not completing in idle. So use a copy of the pointer. */ + simple = data->simple; +#endif + + ostream = g_io_stream_get_output_stream (data->worker->stream); +#ifdef G_OS_UNIX + fd_list = g_dbus_message_get_unix_fd_list (data->message); +#endif + + g_assert (!g_output_stream_has_pending (ostream)); + g_assert_cmpint (data->total_written, <, data->blob_size); + if (FALSE) { } #ifdef G_OS_UNIX - else if (worker->socket != NULL) + else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0) { GOutputVector vector; - GSocketControlMessage *message; - GUnixFDList *fd_list; + GSocketControlMessage *control_message; gssize bytes_written; + GError *error; - fd_list = g_dbus_message_get_unix_fd_list (data->message); + vector.buffer = data->blob; + vector.size = data->blob_size; - message = NULL; - if (fd_list != NULL) + control_message = NULL; + if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0) { - if (!G_IS_UNIX_CONNECTION (worker->stream)) + if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING)) { - g_set_error (error, - G_IO_ERROR, - G_IO_ERROR_INVALID_ARGUMENT, - "Tried sending a file descriptor on unsupported stream of type %s", - g_type_name (G_TYPE_FROM_INSTANCE (worker->stream))); + g_simple_async_result_set_error (simple, + G_IO_ERROR, + G_IO_ERROR_FAILED, + "Tried sending a file descriptor but remote peer does not support this capability"); + g_simple_async_result_complete (simple); + g_object_unref (simple); goto out; } - else if (!(worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING)) - { - g_set_error_literal (error, - G_IO_ERROR, - G_IO_ERROR_INVALID_ARGUMENT, - "Tried sending a file descriptor but remote peer does not support this capability"); - goto out; - } - message = g_unix_fd_message_new_with_fd_list (fd_list); + control_message = g_unix_fd_message_new_with_fd_list (fd_list); } - vector.buffer = data->blob; - vector.size = 16; - - bytes_written = g_socket_send_message (worker->socket, + error = NULL; + bytes_written = g_socket_send_message (data->worker->socket, NULL, /* address */ &vector, 1, - message != NULL ? &message : NULL, - message != NULL ? 1 : 0, + control_message != NULL ? &control_message : NULL, + control_message != NULL ? 1 : 0, G_SOCKET_MSG_NONE, - worker->cancellable, - error); + data->worker->cancellable, + &error); + if (control_message != NULL) + g_object_unref (control_message); + if (bytes_written == -1) { - g_prefix_error (error, _("Error writing first 16 bytes of message to socket: ")); - if (message != NULL) - g_object_unref (message); + /* Handle WOULD_BLOCK by waiting until there's room in the buffer */ + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) + { + GSource *source; + source = g_socket_create_source (data->worker->socket, + G_IO_OUT | G_IO_HUP | G_IO_ERR, + data->worker->cancellable); + g_source_set_callback (source, + (GSourceFunc) on_socket_ready, + data, + NULL); /* GDestroyNotify */ + g_source_attach (source, g_main_context_get_thread_default ()); + g_source_unref (source); + g_error_free (error); + goto out; + } + g_simple_async_result_take_error (simple, error); + g_simple_async_result_complete (simple); + g_object_unref (simple); goto out; } - if (message != NULL) - g_object_unref (message); + g_assert (bytes_written > 0); /* zero is never returned */ - if (bytes_written < 16) + write_message_print_transport_debug (bytes_written, data); + + data->total_written += bytes_written; + g_assert (data->total_written <= data->blob_size); + if (data->total_written == data->blob_size) { - /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary - * messages are sent? - */ - g_assert_not_reached (); + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; } + + write_message_continue_writing (data); } -#endif /* #ifdef G_OS_UNIX */ +#endif else { - /* write the first 16 bytes (guaranteed to return an error if everything can't be written) */ - if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream), - (const gchar *) data->blob, - 16, - NULL, /* bytes_written */ - worker->cancellable, /* cancellable */ - error)) - goto out; - } - - /* Then write the rest of the message (guaranteed to return an error if everything can't be written) */ - if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream), - (const gchar *) data->blob + 16, - data->blob_size - 16, - NULL, /* bytes_written */ - worker->cancellable, /* cancellable */ - error)) - goto out; - - ret = TRUE; - - /* wake up pending flushes */ - g_mutex_lock (worker->write_lock); - worker->write_num_messages_written += 1; - for (l = worker->write_pending_flushes; l != NULL; l = ll) - { - FlushData *f = l->data; - ll = l->next; - - if (f->number_to_wait_for == worker->write_num_messages_written) +#ifdef G_OS_UNIX + if (fd_list != NULL) { - g_mutex_lock (f->mutex); - g_cond_signal (f->cond); - g_mutex_unlock (f->mutex); - worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l); + g_simple_async_result_set_error (simple, + G_IO_ERROR, + G_IO_ERROR_FAILED, + "Tried sending a file descriptor on unsupported stream of type %s", + g_type_name (G_TYPE_FROM_INSTANCE (ostream))); + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; } +#endif + + g_output_stream_write_async (ostream, + (const gchar *) data->blob + data->total_written, + data->blob_size - data->total_written, + G_PRIORITY_DEFAULT, + data->worker->cancellable, + write_message_async_cb, + data); } - g_mutex_unlock (worker->write_lock); +#ifdef G_OS_UNIX + out: +#endif + ; +} +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is PENDING_WRITE on entry + */ +static void +write_message_async (GDBusWorker *worker, + MessageToWriteData *data, + GAsyncReadyCallback callback, + gpointer user_data) +{ + data->simple = g_simple_async_result_new (NULL, + callback, + user_data, + write_message_async); + data->total_written = 0; + write_message_continue_writing (data); +} + +/* called in private thread shared by all GDBusConnection instances (with write-lock held) */ +static gboolean +write_message_finish (GAsyncResult *res, + GError **error) +{ + g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res)) == write_message_async); + if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error)) + return FALSE; + else + return TRUE; +} +/* ---------------------------------------------------------------------------------------------------- */ + +static void continue_writing (GDBusWorker *worker); + +typedef struct +{ + GDBusWorker *worker; + GList *flushers; +} FlushAsyncData; + +static void +flush_data_list_complete (const GList *flushers, + const GError *error) +{ + const GList *l; + + for (l = flushers; l != NULL; l = l->next) + { + FlushData *f = l->data; + + f->error = error != NULL ? g_error_copy (error) : NULL; + + g_mutex_lock (&f->mutex); + g_cond_signal (&f->cond); + g_mutex_unlock (&f->mutex); + } +} + +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is PENDING_FLUSH on entry + */ +static void +ostream_flush_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + FlushAsyncData *data = user_data; + GError *error; + + error = NULL; + g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object), + res, + &error); + + if (error == NULL) + { + if (G_UNLIKELY (_g_dbus_debug_transport ())) + { + _g_dbus_debug_print_lock (); + g_print ("========================================================================\n" + "GDBus-debug:Transport:\n" + " ---- FLUSHED stream of type %s\n", + g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream)))); + _g_dbus_debug_print_unlock (); + } + } + + g_assert (data->flushers != NULL); + flush_data_list_complete (data->flushers, error); + g_list_free (data->flushers); + + if (error != NULL) + g_error_free (error); + + /* Make sure we tell folks that we don't have additional + flushes pending */ + g_mutex_lock (&data->worker->write_lock); + data->worker->write_num_messages_flushed = data->worker->write_num_messages_written; + g_assert (data->worker->output_pending == PENDING_FLUSH); + data->worker->output_pending = PENDING_NONE; + g_mutex_unlock (&data->worker->write_lock); + + /* OK, cool, finally kick off the next write */ + continue_writing (data->worker); + + _g_dbus_worker_unref (data->worker); + g_free (data); +} + +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is PENDING_FLUSH on entry + */ +static void +start_flush (FlushAsyncData *data) +{ + g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream), + G_PRIORITY_DEFAULT, + data->worker->cancellable, + ostream_flush_cb, + data); +} + +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is held on entry + * output_pending is PENDING_NONE on entry + */ +static void +message_written_unlocked (GDBusWorker *worker, + MessageToWriteData *message_data) +{ if (G_UNLIKELY (_g_dbus_debug_message ())) { gchar *s; @@ -951,71 +1393,361 @@ write_message (GDBusWorker *worker, g_print ("========================================================================\n" "GDBus-debug:Message:\n" " >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n", - data->blob_size); - s = g_dbus_message_print (data->message, 2); + message_data->blob_size); + s = g_dbus_message_print (message_data->message, 2); g_print ("%s", s); g_free (s); if (G_UNLIKELY (_g_dbus_debug_payload ())) { - s = _g_dbus_hexdump (data->blob, data->blob_size, 2); + s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2); g_print ("%s\n", s); g_free (s); } _g_dbus_debug_print_unlock (); } - out: - return ret; + worker->write_num_messages_written += 1; } -/* ---------------------------------------------------------------------------------------------------- */ +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is held on entry + * output_pending is PENDING_NONE on entry + * + * Returns: non-%NULL, setting @output_pending, if we need to flush now + */ +static FlushAsyncData * +prepare_flush_unlocked (GDBusWorker *worker) +{ + GList *l; + GList *ll; + GList *flushers; -/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ -static gboolean -write_message_in_idle_cb (gpointer user_data) + flushers = NULL; + for (l = worker->write_pending_flushes; l != NULL; l = ll) + { + FlushData *f = l->data; + ll = l->next; + + if (f->number_to_wait_for == worker->write_num_messages_written) + { + flushers = g_list_append (flushers, f); + worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l); + } + } + if (flushers != NULL) + { + g_assert (worker->output_pending == PENDING_NONE); + worker->output_pending = PENDING_FLUSH; + } + + if (flushers != NULL) + { + FlushAsyncData *data; + + data = g_new0 (FlushAsyncData, 1); + data->worker = _g_dbus_worker_ref (worker); + data->flushers = flushers; + return data; + } + + return NULL; +} + +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is PENDING_WRITE on entry + */ +static void +write_message_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + MessageToWriteData *data = user_data; + GError *error; + + g_mutex_lock (&data->worker->write_lock); + g_assert (data->worker->output_pending == PENDING_WRITE); + data->worker->output_pending = PENDING_NONE; + + error = NULL; + if (!write_message_finish (res, &error)) + { + g_mutex_unlock (&data->worker->write_lock); + + /* TODO: handle */ + _g_dbus_worker_emit_disconnected (data->worker, TRUE, error); + g_error_free (error); + + g_mutex_lock (&data->worker->write_lock); + } + + message_written_unlocked (data->worker, data); + + g_mutex_unlock (&data->worker->write_lock); + + continue_writing (data->worker); + + message_to_write_data_free (data); +} + +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending is PENDING_CLOSE on entry + */ +static void +iostream_close_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) { GDBusWorker *worker = user_data; - gboolean more_writes_are_pending; + GError *error = NULL; + GList *pending_close_attempts, *pending_flush_attempts; + GQueue *send_queue; + + g_io_stream_close_finish (worker->stream, res, &error); + + g_mutex_lock (&worker->write_lock); + + pending_close_attempts = worker->pending_close_attempts; + worker->pending_close_attempts = NULL; + + pending_flush_attempts = worker->write_pending_flushes; + worker->write_pending_flushes = NULL; + + send_queue = worker->write_queue; + worker->write_queue = g_queue_new (); + + g_assert (worker->output_pending == PENDING_CLOSE); + worker->output_pending = PENDING_NONE; + + g_mutex_unlock (&worker->write_lock); + + while (pending_close_attempts != NULL) + { + CloseData *close_data = pending_close_attempts->data; + + pending_close_attempts = g_list_delete_link (pending_close_attempts, + pending_close_attempts); + + if (close_data->result != NULL) + { + if (error != NULL) + g_simple_async_result_set_from_error (close_data->result, error); + + /* this must be in an idle because the result is likely to be + * intended for another thread + */ + g_simple_async_result_complete_in_idle (close_data->result); + } + + close_data_free (close_data); + } + + g_clear_error (&error); + + /* all messages queued for sending are discarded */ + g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free); + /* all queued flushes fail */ + error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED, + _("Operation was cancelled")); + flush_data_list_complete (pending_flush_attempts, error); + g_list_free (pending_flush_attempts); + g_clear_error (&error); + + _g_dbus_worker_unref (worker); +} + +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending must be PENDING_NONE on entry + */ +static void +continue_writing (GDBusWorker *worker) +{ MessageToWriteData *data; - gboolean message_was_dropped; - GError *error; + FlushAsyncData *flush_async_data; - g_mutex_lock (worker->write_lock); - data = g_queue_pop_head (worker->write_queue); - g_assert (data != NULL); - more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0); - worker->write_is_pending = more_writes_are_pending; - g_mutex_unlock (worker->write_lock); + write_next: + /* we mustn't try to write two things at once */ + g_assert (worker->output_pending == PENDING_NONE); + + g_mutex_lock (&worker->write_lock); + + data = NULL; + flush_async_data = NULL; + + /* if we want to close the connection, that takes precedence */ + if (worker->pending_close_attempts != NULL) + { + worker->close_expected = TRUE; + worker->output_pending = PENDING_CLOSE; + + g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT, + NULL, iostream_close_cb, + _g_dbus_worker_ref (worker)); + } + else + { + flush_async_data = prepare_flush_unlocked (worker); + + if (flush_async_data == NULL) + { + data = g_queue_pop_head (worker->write_queue); + + if (data != NULL) + worker->output_pending = PENDING_WRITE; + } + } + + g_mutex_unlock (&worker->write_lock); /* Note that write_lock is only used for protecting the @write_queue - * and @write_is_pending fields of the GDBusWorker struct ... which we + * and @output_pending fields of the GDBusWorker struct ... which we * need to modify from arbitrary threads in _g_dbus_worker_send_message(). * * Therefore, it's fine to drop it here when calling back into user * code and then writing the message out onto the GIOStream since this * function only runs on the worker thread. */ - message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message); - if (G_LIKELY (!message_was_dropped)) + + if (flush_async_data != NULL) { - error = NULL; - if (!write_message (worker, - data, - &error)) + start_flush (flush_async_data); + g_assert (data == NULL); + } + else if (data != NULL) + { + GDBusMessage *old_message; + guchar *new_blob; + gsize new_blob_size; + GError *error; + + old_message = data->message; + data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message); + if (data->message == old_message) + { + /* filters had no effect - do nothing */ + } + else if (data->message == NULL) { - /* TODO: handle */ - _g_dbus_worker_emit_disconnected (worker, TRUE, error); - g_error_free (error); + /* filters dropped message */ + g_mutex_lock (&worker->write_lock); + worker->output_pending = PENDING_NONE; + g_mutex_unlock (&worker->write_lock); + message_to_write_data_free (data); + goto write_next; } + else + { + /* filters altered the message -> reencode */ + error = NULL; + new_blob = g_dbus_message_to_blob (data->message, + &new_blob_size, + worker->capabilities, + &error); + if (new_blob == NULL) + { + /* if filter make the GDBusMessage unencodeable, just complain on stderr and send + * the old message instead + */ + g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s", + g_dbus_message_get_serial (data->message), + error->message); + g_error_free (error); + } + else + { + g_free (data->blob); + data->blob = (gchar *) new_blob; + data->blob_size = new_blob_size; + } + } + + write_message_async (worker, + data, + write_message_cb, + data); } - message_to_write_data_free (data); +} + +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is not held on entry + * output_pending may be anything + */ +static gboolean +continue_writing_in_idle_cb (gpointer user_data) +{ + GDBusWorker *worker = user_data; + + /* Because this is the worker thread, we can read this struct member + * without holding the lock: no other thread ever modifies it. + */ + if (worker->output_pending == PENDING_NONE) + continue_writing (worker); + + return FALSE; +} + +/** + * @write_data: (transfer full) (allow-none): + * @flush_data: (transfer full) (allow-none): + * @close_data: (transfer full) (allow-none): + * + * Can be called from any thread + * + * write_lock is held on entry + * output_pending may be anything + */ +static void +schedule_writing_unlocked (GDBusWorker *worker, + MessageToWriteData *write_data, + FlushData *flush_data, + CloseData *close_data) +{ + if (write_data != NULL) + g_queue_push_tail (worker->write_queue, write_data); + + if (flush_data != NULL) + worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data); + + if (close_data != NULL) + worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts, + close_data); - return more_writes_are_pending; + /* If we had output pending, the next bit of output will happen + * automatically when it finishes, so we only need to do this + * if nothing was pending. + * + * The idle callback will re-check that output_pending is still + * PENDING_NONE, to guard against output starting before the idle. + */ + if (worker->output_pending == PENDING_NONE) + { + GSource *idle_source; + idle_source = g_idle_source_new (); + g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); + g_source_set_callback (idle_source, + continue_writing_in_idle_cb, + _g_dbus_worker_ref (worker), + (GDestroyNotify) _g_dbus_worker_unref); + g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb"); + g_source_attach (idle_source, worker->shared_thread_data->context); + g_source_unref (idle_source); + } } /* ---------------------------------------------------------------------------------------------------- */ -/* can be called from any thread - steals blob */ +/* can be called from any thread - steals blob + * + * write_lock is not held on entry + * output_pending may be anything + */ void _g_dbus_worker_send_message (GDBusWorker *worker, GDBusMessage *message, @@ -1029,43 +1761,18 @@ _g_dbus_worker_send_message (GDBusWorker *worker, g_return_if_fail (blob_len > 16); data = g_new0 (MessageToWriteData, 1); + data->worker = _g_dbus_worker_ref (worker); data->message = g_object_ref (message); data->blob = blob; /* steal! */ data->blob_size = blob_len; - g_mutex_lock (worker->write_lock); - g_queue_push_tail (worker->write_queue, data); - if (!worker->write_is_pending) - { - GSource *idle_source; - - worker->write_is_pending = TRUE; - - idle_source = g_idle_source_new (); - g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); - g_source_set_callback (idle_source, - write_message_in_idle_cb, - _g_dbus_worker_ref (worker), - (GDestroyNotify) _g_dbus_worker_unref); - g_source_attach (idle_source, shared_thread_data->context); - g_source_unref (idle_source); - } - g_mutex_unlock (worker->write_lock); + g_mutex_lock (&worker->write_lock); + schedule_writing_unlocked (worker, data, NULL, NULL); + g_mutex_unlock (&worker->write_lock); } /* ---------------------------------------------------------------------------------------------------- */ -static void -_g_dbus_worker_thread_begin_func (gpointer user_data) -{ - GDBusWorker *worker = user_data; - - worker->thread = g_thread_self (); - - /* begin reading */ - _g_dbus_worker_do_read (worker); -} - GDBusWorker * _g_dbus_worker_new (GIOStream *stream, GDBusCapabilityFlags capabilities, @@ -1076,6 +1783,7 @@ _g_dbus_worker_new (GIOStream *stream, gpointer user_data) { GDBusWorker *worker; + GSource *idle_source; g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL); g_return_val_if_fail (message_received_callback != NULL, NULL); @@ -1085,7 +1793,7 @@ _g_dbus_worker_new (GIOStream *stream, worker = g_new0 (GDBusWorker, 1); worker->ref_count = 1; - worker->read_lock = g_mutex_new (); + g_mutex_init (&worker->read_lock); worker->message_received_callback = message_received_callback; worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback; worker->disconnected_callback = disconnected_callback; @@ -1093,40 +1801,88 @@ _g_dbus_worker_new (GIOStream *stream, worker->stream = g_object_ref (stream); worker->capabilities = capabilities; worker->cancellable = g_cancellable_new (); + worker->output_pending = PENDING_NONE; worker->frozen = initially_frozen; worker->received_messages_while_frozen = g_queue_new (); - worker->write_lock = g_mutex_new (); + g_mutex_init (&worker->write_lock); worker->write_queue = g_queue_new (); if (G_IS_SOCKET_CONNECTION (worker->stream)) worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)); - _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker); +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + if (G_IS_KDBUS_CONNECTION (worker->stream)) + worker->kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream)); +#endif + + worker->shared_thread_data = _g_dbus_shared_thread_ref (); + + /* begin reading */ + idle_source = g_idle_source_new (); + g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); + g_source_set_callback (idle_source, + _g_dbus_worker_do_initial_read, + _g_dbus_worker_ref (worker), + (GDestroyNotify) _g_dbus_worker_unref); + g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read"); + g_source_attach (idle_source, worker->shared_thread_data->context); + g_source_unref (idle_source); return worker; } /* ---------------------------------------------------------------------------------------------------- */ -/* This can be called from any thread - frees worker - guarantees no callbacks - * will ever be issued again +/* can be called from any thread + * + * write_lock is not held on entry + * output_pending may be anything + */ +void +_g_dbus_worker_close (GDBusWorker *worker, + GCancellable *cancellable, + GSimpleAsyncResult *result) +{ + CloseData *close_data; + + close_data = g_slice_new0 (CloseData); + close_data->worker = _g_dbus_worker_ref (worker); + close_data->cancellable = + (cancellable == NULL ? NULL : g_object_ref (cancellable)); + close_data->result = (result == NULL ? NULL : g_object_ref (result)); + + /* Don't set worker->close_expected here - we're in the wrong thread. + * It'll be set before the actual close happens. + */ + g_cancellable_cancel (worker->cancellable); + g_mutex_lock (&worker->write_lock); + schedule_writing_unlocked (worker, NULL, NULL, close_data); + g_mutex_unlock (&worker->write_lock); +} + +/* This can be called from any thread - frees worker. Note that + * callbacks might still happen if called from another thread than the + * worker - use your own synchronization primitive in the callbacks. + * + * write_lock is not held on entry + * output_pending may be anything */ void _g_dbus_worker_stop (GDBusWorker *worker) { - /* If we're called in the worker thread it means we are called from - * a worker callback. This is fine, we just can't lock in that case since - * we're already holding the lock... + g_atomic_int_set (&worker->stopped, TRUE); + + /* Cancel any pending operations and schedule a close of the underlying I/O + * stream in the worker thread */ - if (g_thread_self () != worker->thread) - g_mutex_lock (worker->read_lock); - worker->stopped = TRUE; - if (g_thread_self () != worker->thread) - g_mutex_unlock (worker->read_lock); + _g_dbus_worker_close (worker, NULL, NULL); - g_cancellable_cancel (worker->cancellable); + /* _g_dbus_worker_close holds a ref until after an idle in the worker + * thread has run, so we no longer need to unref in an idle like in + * commit 322e25b535 + */ _g_dbus_worker_unref (worker); } @@ -1135,6 +1891,9 @@ _g_dbus_worker_stop (GDBusWorker *worker) /* can be called from any thread (except the worker thread) - blocks * calling thread until all queued outgoing messages are written and * the transport has been flushed + * + * write_lock is not held on entry + * output_pending may be anything */ gboolean _g_dbus_worker_flush_sync (GDBusWorker *worker, @@ -1143,50 +1902,68 @@ _g_dbus_worker_flush_sync (GDBusWorker *worker, { gboolean ret; FlushData *data; + guint64 pending_writes; data = NULL; + ret = TRUE; + + g_mutex_lock (&worker->write_lock); + + /* if the queue is empty, no write is in-flight and we haven't written + * anything since the last flush, then there's nothing to wait for + */ + pending_writes = g_queue_get_length (worker->write_queue); + + /* if a write is in-flight, we shouldn't be satisfied until the first + * flush operation that follows it + */ + if (worker->output_pending == PENDING_WRITE) + pending_writes += 1; - /* if the queue is empty, there's nothing to wait for */ - g_mutex_lock (worker->write_lock); - if (g_queue_get_length (worker->write_queue) > 0) + if (pending_writes > 0 || + worker->write_num_messages_written != worker->write_num_messages_flushed) { data = g_new0 (FlushData, 1); - data->mutex = g_mutex_new (); - data->cond = g_cond_new (); - data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue); - g_mutex_lock (data->mutex); - worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data); + g_mutex_init (&data->mutex); + g_cond_init (&data->cond); + data->number_to_wait_for = worker->write_num_messages_written + pending_writes; + g_mutex_lock (&data->mutex); + + schedule_writing_unlocked (worker, NULL, data, NULL); } - g_mutex_unlock (worker->write_lock); + g_mutex_unlock (&worker->write_lock); if (data != NULL) { - g_cond_wait (data->cond, data->mutex); - g_mutex_unlock (data->mutex); + g_cond_wait (&data->cond, &data->mutex); + g_mutex_unlock (&data->mutex); - /* note:the element is removed from worker->write_pending_flushes in write_message() */ - g_cond_free (data->cond); - g_mutex_free (data->mutex); + /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */ + g_cond_clear (&data->cond); + g_mutex_clear (&data->mutex); + if (data->error != NULL) + { + ret = FALSE; + g_propagate_error (error, data->error); + } g_free (data); } - ret = g_output_stream_flush (g_io_stream_get_output_stream (worker->stream), - cancellable, - error); return ret; } /* ---------------------------------------------------------------------------------------------------- */ #define G_DBUS_DEBUG_AUTHENTICATION (1<<0) -#define G_DBUS_DEBUG_MESSAGE (1<<1) -#define G_DBUS_DEBUG_PAYLOAD (1<<2) -#define G_DBUS_DEBUG_CALL (1<<3) -#define G_DBUS_DEBUG_SIGNAL (1<<4) -#define G_DBUS_DEBUG_INCOMING (1<<5) -#define G_DBUS_DEBUG_RETURN (1<<6) -#define G_DBUS_DEBUG_EMISSION (1<<7) -#define G_DBUS_DEBUG_ADDRESS (1<<8) +#define G_DBUS_DEBUG_TRANSPORT (1<<1) +#define G_DBUS_DEBUG_MESSAGE (1<<2) +#define G_DBUS_DEBUG_PAYLOAD (1<<3) +#define G_DBUS_DEBUG_CALL (1<<4) +#define G_DBUS_DEBUG_SIGNAL (1<<5) +#define G_DBUS_DEBUG_INCOMING (1<<6) +#define G_DBUS_DEBUG_RETURN (1<<7) +#define G_DBUS_DEBUG_EMISSION (1<<8) +#define G_DBUS_DEBUG_ADDRESS (1<<9) static gint _gdbus_debug_flags = 0; @@ -1198,6 +1975,13 @@ _g_dbus_debug_authentication (void) } gboolean +_g_dbus_debug_transport (void) +{ + _g_dbus_initialize (); + return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0; +} + +gboolean _g_dbus_debug_message (void) { _g_dbus_initialize (); @@ -1267,7 +2051,7 @@ _g_dbus_debug_print_unlock (void) G_UNLOCK (print_lock); } -/* +/** * _g_dbus_initialize: * * Does various one-time init things such as @@ -1286,12 +2070,14 @@ _g_dbus_initialize (void) const gchar *debug; g_dbus_error_domain = G_DBUS_ERROR; + (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */ debug = g_getenv ("G_DBUS_DEBUG"); if (debug != NULL) { const GDebugKey keys[] = { { "authentication", G_DBUS_DEBUG_AUTHENTICATION }, + { "transport", G_DBUS_DEBUG_TRANSPORT }, { "message", G_DBUS_DEBUG_MESSAGE }, { "payload", G_DBUS_DEBUG_PAYLOAD }, { "call", G_DBUS_DEBUG_CALL }, @@ -1410,22 +2196,73 @@ out: gchar * _g_dbus_get_machine_id (GError **error) { +#ifdef G_OS_WIN32 + HW_PROFILE_INFOA info; + char *src, *dest, *res; + int i; + + if (!GetCurrentHwProfileA (&info)) + { + char *message = g_win32_error_message (GetLastError ()); + g_set_error (error, + G_IO_ERROR, + G_IO_ERROR_FAILED, + _("Unable to get Hardware profile: %s"), message); + g_free (message); + return NULL; + } + + /* Form: {12340001-4980-1920-6788-123456789012} */ + src = &info.szHwProfileGuid[0]; + + res = g_malloc (32+1); + dest = res; + + src++; /* Skip { */ + for (i = 0; i < 8; i++) + *dest++ = *src++; + src++; /* Skip - */ + for (i = 0; i < 4; i++) + *dest++ = *src++; + src++; /* Skip - */ + for (i = 0; i < 4; i++) + *dest++ = *src++; + src++; /* Skip - */ + for (i = 0; i < 4; i++) + *dest++ = *src++; + src++; /* Skip - */ + for (i = 0; i < 12; i++) + *dest++ = *src++; + *dest = 0; + + return res; +#else gchar *ret; + GError *first_error; /* TODO: use PACKAGE_LOCALSTATEDIR ? */ ret = NULL; + first_error = NULL; if (!g_file_get_contents ("/var/lib/dbus/machine-id", &ret, NULL, - error)) + &first_error) && + !g_file_get_contents ("/etc/machine-id", + &ret, + NULL, + NULL)) { - g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: ")); + g_propagate_prefixed_error (error, first_error, + _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: ")); } else { + /* ignore the error from the first try, if any */ + g_clear_error (&first_error); /* TODO: validate value */ g_strstrip (ret); } return ret; +#endif } /* ---------------------------------------------------------------------------------------------------- */ @@ -1448,3 +2285,94 @@ _g_dbus_enum_to_string (GType enum_type, gint value) } /* ---------------------------------------------------------------------------------------------------- */ + +static void +write_message_print_transport_debug (gssize bytes_written, + MessageToWriteData *data) +{ + if (G_LIKELY (!_g_dbus_debug_transport ())) + goto out; + + _g_dbus_debug_print_lock (); + g_print ("========================================================================\n" + "GDBus-debug:Transport:\n" + " >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n" + " size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n", + bytes_written, + g_dbus_message_get_serial (data->message), + data->blob_size, + data->total_written, + g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream)))); + _g_dbus_debug_print_unlock (); + out: + ; +} + +/* ---------------------------------------------------------------------------------------------------- */ + +static void +read_message_print_transport_debug (gssize bytes_read, + GDBusWorker *worker) +{ + gsize size; + gint32 serial; + gint32 message_length; + + if (G_LIKELY (!_g_dbus_debug_transport ())) + goto out; + + size = bytes_read + worker->read_buffer_cur_size; + serial = 0; + message_length = 0; + if (size >= 16) + message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL); + if (size >= 1) + { + switch (worker->read_buffer[0]) + { + case 'l': + if (size >= 12) + serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]); + break; + case 'B': + if (size >= 12) + serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]); + break; + default: + /* an error will be set elsewhere if this happens */ + goto out; + } + } + + _g_dbus_debug_print_lock (); + g_print ("========================================================================\n" + "GDBus-debug:Transport:\n" + " <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n" + " size %d to offset %" G_GSIZE_FORMAT " from a %s\n", + bytes_read, + serial, + message_length, + worker->read_buffer_cur_size, + g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream)))); + _g_dbus_debug_print_unlock (); + out: + ; +} + +/* ---------------------------------------------------------------------------------------------------- */ + +gboolean +_g_signal_accumulator_false_handled (GSignalInvocationHint *ihint, + GValue *return_accu, + const GValue *handler_return, + gpointer dummy) +{ + gboolean continue_emission; + gboolean signal_return; + + signal_return = g_value_get_boolean (handler_return); + g_value_set_boolean (return_accu, signal_return); + continue_emission = signal_return; + + return continue_emission; +}