X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gio%2Fgdbusprivate.c;h=91feb414e1b8699d51cfa595536e968e269a3a69;hb=51fac05d73f8363de821eb0d6940dedca13a8c0f;hp=7ccf98f712f446d881c1e5a61139b4bf9c228222;hpb=5f48e2cde5cb2871abb4e07b16140f52244d36ff;p=platform%2Fupstream%2Fglib.git diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c index 7ccf98f..91feb41 100644 --- a/gio/gdbusprivate.c +++ b/gio/gdbusprivate.c @@ -13,9 +13,7 @@ * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General - * Public License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place, Suite 330, - * Boston, MA 02111-1307, USA. + * Public License along with this library; if not, see . * * Author: David Zeuthen */ @@ -24,9 +22,6 @@ #include #include -#ifdef HAVE_UNISTD_H -#include -#endif #include "giotypes.h" #include "gsocket.h" @@ -39,11 +34,14 @@ #include "ginputstream.h" #include "gmemoryinputstream.h" #include "giostream.h" +#include "glib/gstdio.h" #include "gsocketcontrolmessage.h" #include "gsocketconnection.h" #include "gsocketoutputstream.h" #ifdef G_OS_UNIX +#include "gkdbus.h" +#include "gkdbusconnection.h" #include "gunixfdmessage.h" #include "gunixconnection.h" #include "gunixcredentialsmessage.h" @@ -94,6 +92,107 @@ _g_dbus_hexdump (const gchar *data, gsize len, guint indent) /* ---------------------------------------------------------------------------------------------------- */ +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) +typedef struct +{ + GKdbus *kdbus; + GCancellable *cancellable; + + GSimpleAsyncResult *simple; + + gboolean from_mainloop; +} ReadKdbusData; + +static void +read_kdbus_data_free (ReadKdbusData *data) +{ + g_object_unref (data->kdbus); + if (data->cancellable != NULL) + g_object_unref (data->cancellable); + g_object_unref (data->simple); + g_free (data); +} + +static gboolean +_g_kdbus_read_ready (GKdbus *kdbus, + GIOCondition condition, + gpointer user_data) +{ + ReadKdbusData *data = user_data; + GError *error = NULL; + gssize result; + + result = _g_kdbus_receive (data->kdbus, + data->cancellable, + &error); + + if (result >= 0) + { + g_simple_async_result_set_op_res_gssize (data->simple, result); + } + else + { + g_assert (error != NULL); + g_simple_async_result_take_error (data->simple, error); + } + + if (data->from_mainloop) + g_simple_async_result_complete (data->simple); + else + g_simple_async_result_complete_in_idle (data->simple); + + return FALSE; +} + +static void +_g_kdbus_read (GKdbus *kdbus, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + ReadKdbusData *data; + GSource *source; + + data = g_new0 (ReadKdbusData, 1); + data->kdbus = g_object_ref (kdbus); + data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL; + + data->simple = g_simple_async_result_new (G_OBJECT (kdbus), + callback, + user_data, + _g_kdbus_read); + g_simple_async_result_set_check_cancellable (data->simple, cancellable); + + data->from_mainloop = TRUE; + source = _g_kdbus_create_source (data->kdbus, + G_IO_IN, + cancellable); + g_source_set_callback (source, + (GSourceFunc) _g_kdbus_read_ready, + data, + (GDestroyNotify) read_kdbus_data_free); + g_source_attach (source, g_main_context_get_thread_default ()); + g_source_unref (source); +} + +static gssize +_g_kdbus_read_finish (GKdbus *kdbus, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); + + g_return_val_if_fail (G_IS_KDBUS (kdbus), -1); + g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read); + + if (g_simple_async_result_propagate_error (simple, error)) + return -1; + else + return g_simple_async_result_get_op_res_gssize (simple); +} + +#endif /* defined (G_OS_UNIX) && (KDBUS_TRANSPORT) */ + /* Unfortunately ancillary messages are discarded when reading from a * socket using the GSocketInputStream abstraction. So we provide a * very GInputStream-ish API that uses GSocket in this case (very @@ -191,6 +290,7 @@ _g_socket_read_with_control_messages (GSocket *socket, callback, user_data, _g_socket_read_with_control_messages); + g_simple_async_result_set_check_cancellable (data->simple, cancellable); if (!g_socket_condition_check (socket, G_IO_IN)) { @@ -287,7 +387,6 @@ static SharedThreadData * _g_dbus_shared_thread_ref (void) { static gsize shared_thread_data = 0; - GError *error = NULL; SharedThreadData *ret; if (g_once_init_enter (&shared_thread_data)) @@ -304,10 +403,7 @@ _g_dbus_shared_thread_ref (void) data->loop = g_main_loop_new (data->context, FALSE); data->thread = g_thread_new ("gdbus", gdbus_shared_thread_func, - data, - TRUE, - &error); - g_assert_no_error (error); + data); /* We can cast between gsize and gpointer safely */ g_once_init_leave (&shared_thread_data, (gsize) data); } @@ -335,6 +431,13 @@ _g_dbus_shared_thread_unref (SharedThreadData *data) /* ---------------------------------------------------------------------------------------------------- */ +typedef enum { + PENDING_NONE = 0, + PENDING_WRITE, + PENDING_FLUSH, + PENDING_CLOSE +} OutputPending; + struct GDBusWorker { volatile gint ref_count; @@ -359,8 +462,11 @@ struct GDBusWorker GDBusWorkerDisconnectedCallback disconnected_callback; gpointer user_data; - /* if not NULL, stream is GSocketConnection */ + /* if GSocket and GKdbus are NULL, stream is GSocketConnection */ GSocket *socket; +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + GKdbus *kdbus; +#endif /* used for reading */ GMutex read_lock; @@ -372,22 +478,28 @@ struct GDBusWorker GSocketControlMessage **read_ancillary_messages; gint read_num_ancillary_messages; - /* TRUE if an async write, flush or close is pending. + /* Whether an async write, flush or close, or none of those, is pending. * Only the worker thread may change its value, and only with the write_lock. * Other threads may read its value when holding the write_lock. * The worker thread may read its value at any time. */ - gboolean output_pending; + OutputPending output_pending; /* used for writing */ GMutex write_lock; /* queue of MessageToWriteData, protected by write_lock */ GQueue *write_queue; /* protected by write_lock */ guint64 write_num_messages_written; + /* number of messages we'd written out last time we flushed; + * protected by write_lock + */ + guint64 write_num_messages_flushed; /* list of FlushData, protected by write_lock */ GList *write_pending_flushes; /* list of CloseData, protected by write_lock */ GList *pending_close_attempts; + /* no lock - only used from the worker thread */ + gboolean close_expected; }; static void _g_dbus_worker_unref (GDBusWorker *worker); @@ -456,13 +568,9 @@ _g_dbus_worker_unref (GDBusWorker *worker) if (worker->read_fd_list != NULL) g_object_unref (worker->read_fd_list); - g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL); - g_queue_free (worker->received_messages_while_frozen); - + g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref); g_mutex_clear (&worker->write_lock); - g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL); - g_queue_free (worker->write_queue); - + g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free); g_free (worker->read_buffer); g_free (worker); @@ -552,6 +660,7 @@ _g_dbus_worker_unfreeze (GDBusWorker *worker) unfreeze_in_idle_cb, _g_dbus_worker_ref (worker), (GDestroyNotify) _g_dbus_worker_unref); + g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb"); g_source_attach (idle_source, worker->shared_thread_data->context); g_source_unref (idle_source); } @@ -577,7 +686,21 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, goto out; error = NULL; - if (worker->socket == NULL) + bytes_read = 0; + + if (FALSE) + { + } +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + else if (G_IS_KDBUS_CONNECTION (worker->stream)) + { + bytes_read = _g_kdbus_read_finish (worker->kdbus, + res, + &error); + g_error ("[KDBUS] _g_dbus_worker_do_read_cb() - work in progress"); + } +#endif + else if (worker->socket == NULL) bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream), res, &error); @@ -615,7 +738,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, { /* TODO: really want a append_steal() */ g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL); - close (fds[n]); + (void) g_close (fds[n], NULL); } } g_free (fds); @@ -651,7 +774,40 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, if (bytes_read == -1) { - _g_dbus_worker_emit_disconnected (worker, TRUE, error); + if (G_UNLIKELY (_g_dbus_debug_transport ())) + { + _g_dbus_debug_print_lock (); + g_print ("========================================================================\n" + "GDBus-debug:Transport:\n" + " ---- READ ERROR on stream of type %s:\n" + " ---- %s %d: %s\n", + g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))), + g_quark_to_string (error->domain), error->code, + error->message); + _g_dbus_debug_print_unlock (); + } + + /* Every async read that uses this callback uses worker->cancellable + * as its GCancellable. worker->cancellable gets cancelled if and only + * if the GDBusConnection tells us to close (either via + * _g_dbus_worker_stop, which is called on last-unref, or directly), + * so a cancelled read must mean our connection was closed locally. + * + * If we're closing, other errors are possible - notably, + * G_IO_ERROR_CLOSED can be seen if we close the stream with an async + * read in-flight. It seems sensible to treat all read errors during + * closing as an expected thing that doesn't trip exit-on-close. + * + * Because close_expected can't be set until we get into the worker + * thread, but the cancellable is signalled sooner (from another + * thread), we do still need to check the error. + */ + if (worker->close_expected || + g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) + _g_dbus_worker_emit_disconnected (worker, FALSE, NULL); + else + _g_dbus_worker_emit_disconnected (worker, TRUE, error); + g_error_free (error); goto out; } @@ -695,7 +851,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, &error); if (message_len == -1) { - g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message); + g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message); _g_dbus_worker_emit_disconnected (worker, FALSE, error); g_error_free (error); goto out; @@ -787,6 +943,28 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker) { + /* Note that we do need to keep trying to read even if close_expected is + * true, because only failing a read causes us to signal 'closed'. + */ + + /* [KDBUS] + * For KDBUS transport we don't have to alloc buffer (worker->read_buffer) + * instead of it we use kdbus memory pool. On connection stage KDBUS client + * have to register a memory pool, large enough to carry all backlog of + * data enqueued for the connection. + */ + +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + if (G_IS_KDBUS_CONNECTION (worker->stream)) + { + _g_kdbus_read(worker->kdbus, + worker->cancellable, + (GAsyncReadyCallback) _g_dbus_worker_do_read_cb, + _g_dbus_worker_ref (worker)); + return; + } +#endif + /* if bytes_wanted is zero, it means start reading a message */ if (worker->read_buffer_bytes_wanted == 0) { @@ -868,7 +1046,7 @@ static void write_message_continue_writing (MessageToWriteData *data); /* called in private thread shared by all GDBusConnection instances * * write-lock is not held on entry - * output_pending is true on entry + * output_pending is PENDING_WRITE on entry */ static void write_message_async_cb (GObject *source_object, @@ -918,8 +1096,9 @@ write_message_async_cb (GObject *source_object, /* called in private thread shared by all GDBusConnection instances * * write-lock is not held on entry - * output_pending is true on entry + * output_pending is PENDING_WRITE on entry */ +#ifdef G_OS_UNIX static gboolean on_socket_ready (GSocket *socket, GIOCondition condition, @@ -929,25 +1108,28 @@ on_socket_ready (GSocket *socket, write_message_continue_writing (data); return FALSE; /* remove source */ } +#endif /* called in private thread shared by all GDBusConnection instances * * write-lock is not held on entry - * output_pending is true on entry + * output_pending is PENDING_WRITE on entry */ static void write_message_continue_writing (MessageToWriteData *data) { GOutputStream *ostream; - GSimpleAsyncResult *simple; #ifdef G_OS_UNIX + GSimpleAsyncResult *simple; GUnixFDList *fd_list; #endif +#ifdef G_OS_UNIX /* Note: we can't access data->simple after calling g_async_result_complete () because the * callback can free @data and we're not completing in idle. So use a copy of the pointer. */ simple = data->simple; +#endif ostream = g_io_stream_get_output_stream (data->worker->stream); #ifdef G_OS_UNIX @@ -1063,14 +1245,16 @@ write_message_continue_writing (MessageToWriteData *data) write_message_async_cb, data); } +#ifdef G_OS_UNIX out: +#endif ; } /* called in private thread shared by all GDBusConnection instances * * write-lock is not held on entry - * output_pending is true on entry + * output_pending is PENDING_WRITE on entry */ static void write_message_async (GDBusWorker *worker, @@ -1086,7 +1270,7 @@ write_message_async (GDBusWorker *worker, write_message_continue_writing (data); } -/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +/* called in private thread shared by all GDBusConnection instances (with write-lock held) */ static gboolean write_message_finish (GAsyncResult *res, GError **error) @@ -1099,7 +1283,7 @@ write_message_finish (GAsyncResult *res, } /* ---------------------------------------------------------------------------------------------------- */ -static void maybe_write_next_message (GDBusWorker *worker); +static void continue_writing (GDBusWorker *worker); typedef struct { @@ -1128,7 +1312,7 @@ flush_data_list_complete (const GList *flushers, /* called in private thread shared by all GDBusConnection instances * * write-lock is not held on entry - * output_pending is true on entry + * output_pending is PENDING_FLUSH on entry */ static void ostream_flush_cb (GObject *source_object, @@ -1166,12 +1350,13 @@ ostream_flush_cb (GObject *source_object, /* Make sure we tell folks that we don't have additional flushes pending */ g_mutex_lock (&data->worker->write_lock); - g_assert (data->worker->output_pending); - data->worker->output_pending = FALSE; + data->worker->write_num_messages_flushed = data->worker->write_num_messages_written; + g_assert (data->worker->output_pending == PENDING_FLUSH); + data->worker->output_pending = PENDING_NONE; g_mutex_unlock (&data->worker->write_lock); /* OK, cool, finally kick off the next write */ - maybe_write_next_message (data->worker); + continue_writing (data->worker); _g_dbus_worker_unref (data->worker); g_free (data); @@ -1180,17 +1365,27 @@ ostream_flush_cb (GObject *source_object, /* called in private thread shared by all GDBusConnection instances * * write-lock is not held on entry - * output_pending is false on entry + * output_pending is PENDING_FLUSH on entry */ static void -message_written (GDBusWorker *worker, - MessageToWriteData *message_data) +start_flush (FlushAsyncData *data) { - GList *l; - GList *ll; - GList *flushers; + g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream), + G_PRIORITY_DEFAULT, + data->worker->cancellable, + ostream_flush_cb, + data); +} - /* first log the fact that we wrote a message */ +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is held on entry + * output_pending is PENDING_NONE on entry + */ +static void +message_written_unlocked (GDBusWorker *worker, + MessageToWriteData *message_data) +{ if (G_UNLIKELY (_g_dbus_debug_message ())) { gchar *s; @@ -1211,10 +1406,24 @@ message_written (GDBusWorker *worker, _g_dbus_debug_print_unlock (); } - /* then first wake up pending flushes and, if needed, flush the stream */ - flushers = NULL; - g_mutex_lock (&worker->write_lock); worker->write_num_messages_written += 1; +} + +/* called in private thread shared by all GDBusConnection instances + * + * write-lock is held on entry + * output_pending is PENDING_NONE on entry + * + * Returns: non-%NULL, setting @output_pending, if we need to flush now + */ +static FlushAsyncData * +prepare_flush_unlocked (GDBusWorker *worker) +{ + GList *l; + GList *ll; + GList *flushers; + + flushers = NULL; for (l = worker->write_pending_flushes; l != NULL; l = ll) { FlushData *f = l->data; @@ -1228,35 +1437,27 @@ message_written (GDBusWorker *worker, } if (flushers != NULL) { - g_assert (!worker->output_pending); - worker->output_pending = TRUE; + g_assert (worker->output_pending == PENDING_NONE); + worker->output_pending = PENDING_FLUSH; } - g_mutex_unlock (&worker->write_lock); if (flushers != NULL) { FlushAsyncData *data; + data = g_new0 (FlushAsyncData, 1); data->worker = _g_dbus_worker_ref (worker); data->flushers = flushers; - /* flush the stream before writing the next message */ - g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream), - G_PRIORITY_DEFAULT, - worker->cancellable, - ostream_flush_cb, - data); - } - else - { - /* kick off the next write! */ - maybe_write_next_message (worker); + return data; } + + return NULL; } /* called in private thread shared by all GDBusConnection instances * * write-lock is not held on entry - * output_pending is true on entry + * output_pending is PENDING_WRITE on entry */ static void write_message_cb (GObject *source_object, @@ -1267,23 +1468,26 @@ write_message_cb (GObject *source_object, GError *error; g_mutex_lock (&data->worker->write_lock); - g_assert (data->worker->output_pending); - data->worker->output_pending = FALSE; - g_mutex_unlock (&data->worker->write_lock); + g_assert (data->worker->output_pending == PENDING_WRITE); + data->worker->output_pending = PENDING_NONE; error = NULL; if (!write_message_finish (res, &error)) { + g_mutex_unlock (&data->worker->write_lock); + /* TODO: handle */ _g_dbus_worker_emit_disconnected (data->worker, TRUE, error); g_error_free (error); + + g_mutex_lock (&data->worker->write_lock); } - /* this function will also kick of the next write (it might need to - * flush so writing the next message might happen much later - * e.g. async) - */ - message_written (data->worker, data); + message_written_unlocked (data->worker, data); + + g_mutex_unlock (&data->worker->write_lock); + + continue_writing (data->worker); message_to_write_data_free (data); } @@ -1291,7 +1495,7 @@ write_message_cb (GObject *source_object, /* called in private thread shared by all GDBusConnection instances * * write-lock is not held on entry - * output_pending is true on entry + * output_pending is PENDING_CLOSE on entry */ static void iostream_close_cb (GObject *source_object, @@ -1316,8 +1520,8 @@ iostream_close_cb (GObject *source_object, send_queue = worker->write_queue; worker->write_queue = g_queue_new (); - g_assert (worker->output_pending); - worker->output_pending = FALSE; + g_assert (worker->output_pending == PENDING_CLOSE); + worker->output_pending = PENDING_NONE; g_mutex_unlock (&worker->write_lock); @@ -1345,9 +1549,7 @@ iostream_close_cb (GObject *source_object, g_clear_error (&error); /* all messages queued for sending are discarded */ - g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL); - g_queue_free (send_queue); - + g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free); /* all queued flushes fail */ error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED, _("Operation was cancelled")); @@ -1361,35 +1563,44 @@ iostream_close_cb (GObject *source_object, /* called in private thread shared by all GDBusConnection instances * * write-lock is not held on entry - * output_pending must be false on entry + * output_pending must be PENDING_NONE on entry */ static void -maybe_write_next_message (GDBusWorker *worker) +continue_writing (GDBusWorker *worker) { MessageToWriteData *data; + FlushAsyncData *flush_async_data; write_next: /* we mustn't try to write two things at once */ - g_assert (!worker->output_pending); + g_assert (worker->output_pending == PENDING_NONE); g_mutex_lock (&worker->write_lock); + data = NULL; + flush_async_data = NULL; + /* if we want to close the connection, that takes precedence */ if (worker->pending_close_attempts != NULL) { - worker->output_pending = TRUE; + worker->close_expected = TRUE; + worker->output_pending = PENDING_CLOSE; g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT, NULL, iostream_close_cb, _g_dbus_worker_ref (worker)); - data = NULL; } else { - data = g_queue_pop_head (worker->write_queue); + flush_async_data = prepare_flush_unlocked (worker); - if (data != NULL) - worker->output_pending = TRUE; + if (flush_async_data == NULL) + { + data = g_queue_pop_head (worker->write_queue); + + if (data != NULL) + worker->output_pending = PENDING_WRITE; + } } g_mutex_unlock (&worker->write_lock); @@ -1402,7 +1613,13 @@ maybe_write_next_message (GDBusWorker *worker) * code and then writing the message out onto the GIOStream since this * function only runs on the worker thread. */ - if (data != NULL) + + if (flush_async_data != NULL) + { + start_flush (flush_async_data); + g_assert (data == NULL); + } + else if (data != NULL) { GDBusMessage *old_message; guchar *new_blob; @@ -1419,7 +1636,7 @@ maybe_write_next_message (GDBusWorker *worker) { /* filters dropped message */ g_mutex_lock (&worker->write_lock); - worker->output_pending = FALSE; + worker->output_pending = PENDING_NONE; g_mutex_unlock (&worker->write_lock); message_to_write_data_free (data); goto write_next; @@ -1460,59 +1677,68 @@ maybe_write_next_message (GDBusWorker *worker) /* called in private thread shared by all GDBusConnection instances * * write-lock is not held on entry - * output_pending may be true or false + * output_pending may be anything */ static gboolean -write_message_in_idle_cb (gpointer user_data) +continue_writing_in_idle_cb (gpointer user_data) { GDBusWorker *worker = user_data; /* Because this is the worker thread, we can read this struct member * without holding the lock: no other thread ever modifies it. */ - if (!worker->output_pending) - maybe_write_next_message (worker); + if (worker->output_pending == PENDING_NONE) + continue_writing (worker); return FALSE; } -/* +/** * @write_data: (transfer full) (allow-none): + * @flush_data: (transfer full) (allow-none): * @close_data: (transfer full) (allow-none): * * Can be called from any thread * - * write_lock is not held on entry - * output_pending may be true or false + * write_lock is held on entry + * output_pending may be anything */ static void -schedule_write_in_worker_thread (GDBusWorker *worker, - MessageToWriteData *write_data, - CloseData *close_data) +schedule_writing_unlocked (GDBusWorker *worker, + MessageToWriteData *write_data, + FlushData *flush_data, + CloseData *close_data) { - g_mutex_lock (&worker->write_lock); - if (write_data != NULL) g_queue_push_tail (worker->write_queue, write_data); + if (flush_data != NULL) + worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data); + if (close_data != NULL) worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts, close_data); - if (!worker->output_pending) + /* If we had output pending, the next bit of output will happen + * automatically when it finishes, so we only need to do this + * if nothing was pending. + * + * The idle callback will re-check that output_pending is still + * PENDING_NONE, to guard against output starting before the idle. + */ + if (worker->output_pending == PENDING_NONE) { GSource *idle_source; idle_source = g_idle_source_new (); g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); g_source_set_callback (idle_source, - write_message_in_idle_cb, + continue_writing_in_idle_cb, _g_dbus_worker_ref (worker), (GDestroyNotify) _g_dbus_worker_unref); + g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb"); g_source_attach (idle_source, worker->shared_thread_data->context); g_source_unref (idle_source); } - - g_mutex_unlock (&worker->write_lock); } /* ---------------------------------------------------------------------------------------------------- */ @@ -1520,7 +1746,7 @@ schedule_write_in_worker_thread (GDBusWorker *worker, /* can be called from any thread - steals blob * * write_lock is not held on entry - * output_pending may be true or false + * output_pending may be anything */ void _g_dbus_worker_send_message (GDBusWorker *worker, @@ -1540,7 +1766,9 @@ _g_dbus_worker_send_message (GDBusWorker *worker, data->blob = blob; /* steal! */ data->blob_size = blob_len; - schedule_write_in_worker_thread (worker, data, NULL); + g_mutex_lock (&worker->write_lock); + schedule_writing_unlocked (worker, data, NULL, NULL); + g_mutex_unlock (&worker->write_lock); } /* ---------------------------------------------------------------------------------------------------- */ @@ -1573,7 +1801,7 @@ _g_dbus_worker_new (GIOStream *stream, worker->stream = g_object_ref (stream); worker->capabilities = capabilities; worker->cancellable = g_cancellable_new (); - worker->output_pending = FALSE; + worker->output_pending = PENDING_NONE; worker->frozen = initially_frozen; worker->received_messages_while_frozen = g_queue_new (); @@ -1584,6 +1812,11 @@ _g_dbus_worker_new (GIOStream *stream, if (G_IS_SOCKET_CONNECTION (worker->stream)) worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)); +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + if (G_IS_KDBUS_CONNECTION (worker->stream)) + worker->kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream)); +#endif + worker->shared_thread_data = _g_dbus_shared_thread_ref (); /* begin reading */ @@ -1593,6 +1826,7 @@ _g_dbus_worker_new (GIOStream *stream, _g_dbus_worker_do_initial_read, _g_dbus_worker_ref (worker), (GDestroyNotify) _g_dbus_worker_unref); + g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read"); g_source_attach (idle_source, worker->shared_thread_data->context); g_source_unref (idle_source); @@ -1604,7 +1838,7 @@ _g_dbus_worker_new (GIOStream *stream, /* can be called from any thread * * write_lock is not held on entry - * output_pending may be true or false + * output_pending may be anything */ void _g_dbus_worker_close (GDBusWorker *worker, @@ -1619,8 +1853,13 @@ _g_dbus_worker_close (GDBusWorker *worker, (cancellable == NULL ? NULL : g_object_ref (cancellable)); close_data->result = (result == NULL ? NULL : g_object_ref (result)); + /* Don't set worker->close_expected here - we're in the wrong thread. + * It'll be set before the actual close happens. + */ g_cancellable_cancel (worker->cancellable); - schedule_write_in_worker_thread (worker, NULL, close_data); + g_mutex_lock (&worker->write_lock); + schedule_writing_unlocked (worker, NULL, NULL, close_data); + g_mutex_unlock (&worker->write_lock); } /* This can be called from any thread - frees worker. Note that @@ -1628,7 +1867,7 @@ _g_dbus_worker_close (GDBusWorker *worker, * worker - use your own synchronization primitive in the callbacks. * * write_lock is not held on entry - * output_pending may be true or false + * output_pending may be anything */ void _g_dbus_worker_stop (GDBusWorker *worker) @@ -1640,7 +1879,7 @@ _g_dbus_worker_stop (GDBusWorker *worker) */ _g_dbus_worker_close (worker, NULL, NULL); - /* _g_dbus_worker_close holds a ref until after an idle in the the worker + /* _g_dbus_worker_close holds a ref until after an idle in the worker * thread has run, so we no longer need to unref in an idle like in * commit 322e25b535 */ @@ -1654,7 +1893,7 @@ _g_dbus_worker_stop (GDBusWorker *worker) * the transport has been flushed * * write_lock is not held on entry - * output_pending may be true or false + * output_pending may be anything */ gboolean _g_dbus_worker_flush_sync (GDBusWorker *worker, @@ -1663,20 +1902,34 @@ _g_dbus_worker_flush_sync (GDBusWorker *worker, { gboolean ret; FlushData *data; + guint64 pending_writes; data = NULL; ret = TRUE; - /* if the queue is empty, there's nothing to wait for */ g_mutex_lock (&worker->write_lock); - if (g_queue_get_length (worker->write_queue) > 0) + + /* if the queue is empty, no write is in-flight and we haven't written + * anything since the last flush, then there's nothing to wait for + */ + pending_writes = g_queue_get_length (worker->write_queue); + + /* if a write is in-flight, we shouldn't be satisfied until the first + * flush operation that follows it + */ + if (worker->output_pending == PENDING_WRITE) + pending_writes += 1; + + if (pending_writes > 0 || + worker->write_num_messages_written != worker->write_num_messages_flushed) { data = g_new0 (FlushData, 1); g_mutex_init (&data->mutex); g_cond_init (&data->cond); - data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue); + data->number_to_wait_for = worker->write_num_messages_written + pending_writes; g_mutex_lock (&data->mutex); - worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data); + + schedule_writing_unlocked (worker, NULL, data, NULL); } g_mutex_unlock (&worker->write_lock); @@ -1687,7 +1940,7 @@ _g_dbus_worker_flush_sync (GDBusWorker *worker, /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */ g_cond_clear (&data->cond); - g_mutex_free (&data->mutex); + g_mutex_clear (&data->mutex); if (data->error != NULL) { ret = FALSE; @@ -1798,7 +2051,7 @@ _g_dbus_debug_print_unlock (void) G_UNLOCK (print_lock); } -/* +/** * _g_dbus_initialize: * * Does various one-time init things such as @@ -1943,22 +2196,73 @@ out: gchar * _g_dbus_get_machine_id (GError **error) { +#ifdef G_OS_WIN32 + HW_PROFILE_INFOA info; + char *src, *dest, *res; + int i; + + if (!GetCurrentHwProfileA (&info)) + { + char *message = g_win32_error_message (GetLastError ()); + g_set_error (error, + G_IO_ERROR, + G_IO_ERROR_FAILED, + _("Unable to get Hardware profile: %s"), message); + g_free (message); + return NULL; + } + + /* Form: {12340001-4980-1920-6788-123456789012} */ + src = &info.szHwProfileGuid[0]; + + res = g_malloc (32+1); + dest = res; + + src++; /* Skip { */ + for (i = 0; i < 8; i++) + *dest++ = *src++; + src++; /* Skip - */ + for (i = 0; i < 4; i++) + *dest++ = *src++; + src++; /* Skip - */ + for (i = 0; i < 4; i++) + *dest++ = *src++; + src++; /* Skip - */ + for (i = 0; i < 4; i++) + *dest++ = *src++; + src++; /* Skip - */ + for (i = 0; i < 12; i++) + *dest++ = *src++; + *dest = 0; + + return res; +#else gchar *ret; + GError *first_error; /* TODO: use PACKAGE_LOCALSTATEDIR ? */ ret = NULL; + first_error = NULL; if (!g_file_get_contents ("/var/lib/dbus/machine-id", &ret, NULL, - error)) + &first_error) && + !g_file_get_contents ("/etc/machine-id", + &ret, + NULL, + NULL)) { - g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: ")); + g_propagate_prefixed_error (error, first_error, + _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: ")); } else { + /* ignore the error from the first try, if any */ + g_clear_error (&first_error); /* TODO: validate value */ g_strstrip (ret); } return ret; +#endif } /* ---------------------------------------------------------------------------------------------------- */ @@ -1992,7 +2296,7 @@ write_message_print_transport_debug (gssize bytes_written, _g_dbus_debug_print_lock (); g_print ("========================================================================\n" "GDBus-debug:Transport:\n" - " >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n" + " >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n" " size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n", bytes_written, g_dbus_message_get_serial (data->message), @@ -2043,7 +2347,7 @@ read_message_print_transport_debug (gssize bytes_read, _g_dbus_debug_print_lock (); g_print ("========================================================================\n" "GDBus-debug:Transport:\n" - " <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n" + " <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n" " size %d to offset %" G_GSIZE_FORMAT " from a %s\n", bytes_read, serial,