* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General
- * Public License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
*
* Author: David Zeuthen <davidz@redhat.com>
*/
#include <stdlib.h>
#include <string.h>
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
#include "giotypes.h"
#include "gsocket.h"
#include "ginputstream.h"
#include "gmemoryinputstream.h"
#include "giostream.h"
+#include "glib/gstdio.h"
#include "gsocketcontrolmessage.h"
#include "gsocketconnection.h"
#include "gsocketoutputstream.h"
callback,
user_data,
_g_socket_read_with_control_messages);
+ g_simple_async_result_set_check_cancellable (data->simple, cancellable);
if (!g_socket_condition_check (socket, G_IO_IN))
{
_g_dbus_shared_thread_ref (void)
{
static gsize shared_thread_data = 0;
- GError *error = NULL;
SharedThreadData *ret;
if (g_once_init_enter (&shared_thread_data))
data->context = g_main_context_new ();
data->loop = g_main_loop_new (data->context, FALSE);
- data->thread = g_thread_create (gdbus_shared_thread_func,
- data,
- TRUE,
- &error);
- g_assert_no_error (error);
+ data->thread = g_thread_new ("gdbus",
+ gdbus_shared_thread_func,
+ data);
/* We can cast between gsize and gpointer safely */
g_once_init_leave (&shared_thread_data, (gsize) data);
}
/* ---------------------------------------------------------------------------------------------------- */
+typedef enum {
+ PENDING_NONE = 0,
+ PENDING_WRITE,
+ PENDING_FLUSH,
+ PENDING_CLOSE
+} OutputPending;
+
struct GDBusWorker
{
volatile gint ref_count;
SharedThreadData *shared_thread_data;
- gboolean stopped;
+ /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
+ volatile gint stopped;
/* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
* only affects messages received from the other peer (since GDBusServer is the
GSocket *socket;
/* used for reading */
- GMutex *read_lock;
+ GMutex read_lock;
gchar *read_buffer;
gsize read_buffer_allocated_size;
gsize read_buffer_cur_size;
GSocketControlMessage **read_ancillary_messages;
gint read_num_ancillary_messages;
- /* TRUE if an async write, flush or close is pending.
+ /* Whether an async write, flush or close, or none of those, is pending.
* Only the worker thread may change its value, and only with the write_lock.
* Other threads may read its value when holding the write_lock.
* The worker thread may read its value at any time.
*/
- gboolean output_pending;
+ OutputPending output_pending;
/* used for writing */
- GMutex *write_lock;
+ GMutex write_lock;
/* queue of MessageToWriteData, protected by write_lock */
GQueue *write_queue;
/* protected by write_lock */
guint64 write_num_messages_written;
+ /* number of messages we'd written out last time we flushed;
+ * protected by write_lock
+ */
+ guint64 write_num_messages_flushed;
/* list of FlushData, protected by write_lock */
GList *write_pending_flushes;
/* list of CloseData, protected by write_lock */
GList *pending_close_attempts;
+ /* no lock - only used from the worker thread */
+ gboolean close_expected;
};
static void _g_dbus_worker_unref (GDBusWorker *worker);
typedef struct
{
- GMutex *mutex;
- GCond *cond;
+ GMutex mutex;
+ GCond cond;
guint64 number_to_wait_for;
GError *error;
} FlushData;
g_object_unref (worker->stream);
- g_mutex_free (worker->read_lock);
+ g_mutex_clear (&worker->read_lock);
g_object_unref (worker->cancellable);
if (worker->read_fd_list != NULL)
g_object_unref (worker->read_fd_list);
- g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
- g_queue_free (worker->received_messages_while_frozen);
-
- g_mutex_free (worker->write_lock);
- g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
- g_queue_free (worker->write_queue);
-
+ g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
+ g_mutex_clear (&worker->write_lock);
+ g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
g_free (worker->read_buffer);
g_free (worker);
gboolean remote_peer_vanished,
GError *error)
{
- if (!worker->stopped)
+ if (!g_atomic_int_get (&worker->stopped))
worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
}
_g_dbus_worker_emit_message_received (GDBusWorker *worker,
GDBusMessage *message)
{
- if (!worker->stopped)
+ if (!g_atomic_int_get (&worker->stopped))
worker->message_received_callback (worker, message, worker->user_data);
}
GDBusMessage *message)
{
GDBusMessage *ret;
- if (!worker->stopped)
+ if (!g_atomic_int_get (&worker->stopped))
ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
else
ret = message;
GDBusWorker *worker = user_data;
GDBusMessage *message;
- g_mutex_lock (worker->read_lock);
+ g_mutex_lock (&worker->read_lock);
if (worker->frozen)
{
while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
{
g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
}
- g_mutex_unlock (worker->read_lock);
+ g_mutex_unlock (&worker->read_lock);
return FALSE;
}
unfreeze_in_idle_cb,
_g_dbus_worker_ref (worker),
(GDestroyNotify) _g_dbus_worker_unref);
+ g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb");
g_source_attach (idle_source, worker->shared_thread_data->context);
g_source_unref (idle_source);
}
GError *error;
gssize bytes_read;
- g_mutex_lock (worker->read_lock);
+ g_mutex_lock (&worker->read_lock);
/* If already stopped, don't even process the reply */
- if (worker->stopped)
+ if (g_atomic_int_get (&worker->stopped))
goto out;
error = NULL;
{
/* TODO: really want a append_steal() */
g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
- close (fds[n]);
+ (void) g_close (fds[n], NULL);
}
}
g_free (fds);
if (bytes_read == -1)
{
- _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+ if (G_UNLIKELY (_g_dbus_debug_transport ()))
+ {
+ _g_dbus_debug_print_lock ();
+ g_print ("========================================================================\n"
+ "GDBus-debug:Transport:\n"
+ " ---- READ ERROR on stream of type %s:\n"
+ " ---- %s %d: %s\n",
+ g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
+ g_quark_to_string (error->domain), error->code,
+ error->message);
+ _g_dbus_debug_print_unlock ();
+ }
+
+ /* Every async read that uses this callback uses worker->cancellable
+ * as its GCancellable. worker->cancellable gets cancelled if and only
+ * if the GDBusConnection tells us to close (either via
+ * _g_dbus_worker_stop, which is called on last-unref, or directly),
+ * so a cancelled read must mean our connection was closed locally.
+ *
+ * If we're closing, other errors are possible - notably,
+ * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
+ * read in-flight. It seems sensible to treat all read errors during
+ * closing as an expected thing that doesn't trip exit-on-close.
+ *
+ * Because close_expected can't be set until we get into the worker
+ * thread, but the cancellable is signalled sooner (from another
+ * thread), we do still need to check the error.
+ */
+ if (worker->close_expected ||
+ g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+ _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
+ else
+ _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+
g_error_free (error);
goto out;
}
&error);
if (message_len == -1)
{
- g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
+ g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
_g_dbus_worker_emit_disconnected (worker, FALSE, error);
g_error_free (error);
goto out;
}
out:
- g_mutex_unlock (worker->read_lock);
+ g_mutex_unlock (&worker->read_lock);
/* gives up the reference acquired when calling g_input_stream_read_async() */
_g_dbus_worker_unref (worker);
static void
_g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
{
+ /* Note that we do need to keep trying to read even if close_expected is
+ * true, because only failing a read causes us to signal 'closed'.
+ */
+
/* if bytes_wanted is zero, it means start reading a message */
if (worker->read_buffer_bytes_wanted == 0)
{
_g_dbus_worker_do_initial_read (gpointer data)
{
GDBusWorker *worker = data;
- g_mutex_lock (worker->read_lock);
+ g_mutex_lock (&worker->read_lock);
_g_dbus_worker_do_read_unlocked (worker);
- g_mutex_unlock (worker->read_lock);
+ g_mutex_unlock (&worker->read_lock);
return FALSE;
}
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
*/
static void
write_message_async_cb (GObject *source_object,
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
*/
+#ifdef G_OS_UNIX
static gboolean
on_socket_ready (GSocket *socket,
GIOCondition condition,
write_message_continue_writing (data);
return FALSE; /* remove source */
}
+#endif
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
*/
static void
write_message_continue_writing (MessageToWriteData *data)
{
GOutputStream *ostream;
- GSimpleAsyncResult *simple;
#ifdef G_OS_UNIX
+ GSimpleAsyncResult *simple;
GUnixFDList *fd_list;
#endif
+#ifdef G_OS_UNIX
/* Note: we can't access data->simple after calling g_async_result_complete () because the
* callback can free @data and we're not completing in idle. So use a copy of the pointer.
*/
simple = data->simple;
+#endif
ostream = g_io_stream_get_output_stream (data->worker->stream);
#ifdef G_OS_UNIX
write_message_async_cb,
data);
}
+#ifdef G_OS_UNIX
out:
+#endif
;
}
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
*/
static void
write_message_async (GDBusWorker *worker,
write_message_continue_writing (data);
}
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances (with write-lock held) */
static gboolean
write_message_finish (GAsyncResult *res,
GError **error)
}
/* ---------------------------------------------------------------------------------------------------- */
-static void maybe_write_next_message (GDBusWorker *worker);
+static void continue_writing (GDBusWorker *worker);
typedef struct
{
f->error = error != NULL ? g_error_copy (error) : NULL;
- g_mutex_lock (f->mutex);
- g_cond_signal (f->cond);
- g_mutex_unlock (f->mutex);
+ g_mutex_lock (&f->mutex);
+ g_cond_signal (&f->cond);
+ g_mutex_unlock (&f->mutex);
}
}
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_FLUSH on entry
*/
static void
ostream_flush_cb (GObject *source_object,
/* Make sure we tell folks that we don't have additional
flushes pending */
- g_mutex_lock (data->worker->write_lock);
- g_assert (data->worker->output_pending);
- data->worker->output_pending = FALSE;
- g_mutex_unlock (data->worker->write_lock);
+ g_mutex_lock (&data->worker->write_lock);
+ data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
+ g_assert (data->worker->output_pending == PENDING_FLUSH);
+ data->worker->output_pending = PENDING_NONE;
+ g_mutex_unlock (&data->worker->write_lock);
/* OK, cool, finally kick off the next write */
- maybe_write_next_message (data->worker);
+ continue_writing (data->worker);
_g_dbus_worker_unref (data->worker);
g_free (data);
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is false on entry
+ * output_pending is PENDING_FLUSH on entry
*/
static void
-message_written (GDBusWorker *worker,
- MessageToWriteData *message_data)
+start_flush (FlushAsyncData *data)
{
- GList *l;
- GList *ll;
- GList *flushers;
+ g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
+ G_PRIORITY_DEFAULT,
+ data->worker->cancellable,
+ ostream_flush_cb,
+ data);
+}
- /* first log the fact that we wrote a message */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ */
+static void
+message_written_unlocked (GDBusWorker *worker,
+ MessageToWriteData *message_data)
+{
if (G_UNLIKELY (_g_dbus_debug_message ()))
{
gchar *s;
_g_dbus_debug_print_unlock ();
}
- /* then first wake up pending flushes and, if needed, flush the stream */
- flushers = NULL;
- g_mutex_lock (worker->write_lock);
worker->write_num_messages_written += 1;
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ *
+ * Returns: non-%NULL, setting @output_pending, if we need to flush now
+ */
+static FlushAsyncData *
+prepare_flush_unlocked (GDBusWorker *worker)
+{
+ GList *l;
+ GList *ll;
+ GList *flushers;
+
+ flushers = NULL;
for (l = worker->write_pending_flushes; l != NULL; l = ll)
{
FlushData *f = l->data;
}
if (flushers != NULL)
{
- g_assert (!worker->output_pending);
- worker->output_pending = TRUE;
+ g_assert (worker->output_pending == PENDING_NONE);
+ worker->output_pending = PENDING_FLUSH;
}
- g_mutex_unlock (worker->write_lock);
if (flushers != NULL)
{
FlushAsyncData *data;
+
data = g_new0 (FlushAsyncData, 1);
data->worker = _g_dbus_worker_ref (worker);
data->flushers = flushers;
- /* flush the stream before writing the next message */
- g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
- G_PRIORITY_DEFAULT,
- worker->cancellable,
- ostream_flush_cb,
- data);
- }
- else
- {
- /* kick off the next write! */
- maybe_write_next_message (worker);
+ return data;
}
+
+ return NULL;
}
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
*/
static void
write_message_cb (GObject *source_object,
MessageToWriteData *data = user_data;
GError *error;
- g_mutex_lock (data->worker->write_lock);
- g_assert (data->worker->output_pending);
- data->worker->output_pending = FALSE;
- g_mutex_unlock (data->worker->write_lock);
+ g_mutex_lock (&data->worker->write_lock);
+ g_assert (data->worker->output_pending == PENDING_WRITE);
+ data->worker->output_pending = PENDING_NONE;
error = NULL;
if (!write_message_finish (res, &error))
{
+ g_mutex_unlock (&data->worker->write_lock);
+
/* TODO: handle */
_g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
g_error_free (error);
+
+ g_mutex_lock (&data->worker->write_lock);
}
- /* this function will also kick of the next write (it might need to
- * flush so writing the next message might happen much later
- * e.g. async)
- */
- message_written (data->worker, data);
+ message_written_unlocked (data->worker, data);
+
+ g_mutex_unlock (&data->worker->write_lock);
+
+ continue_writing (data->worker);
message_to_write_data_free (data);
}
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_CLOSE on entry
*/
static void
iostream_close_cb (GObject *source_object,
g_io_stream_close_finish (worker->stream, res, &error);
- g_mutex_lock (worker->write_lock);
+ g_mutex_lock (&worker->write_lock);
pending_close_attempts = worker->pending_close_attempts;
worker->pending_close_attempts = NULL;
send_queue = worker->write_queue;
worker->write_queue = g_queue_new ();
- g_assert (worker->output_pending);
- worker->output_pending = FALSE;
+ g_assert (worker->output_pending == PENDING_CLOSE);
+ worker->output_pending = PENDING_NONE;
- g_mutex_unlock (worker->write_lock);
+ g_mutex_unlock (&worker->write_lock);
while (pending_close_attempts != NULL)
{
g_clear_error (&error);
/* all messages queued for sending are discarded */
- g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
- g_queue_free (send_queue);
-
+ g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
/* all queued flushes fail */
error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
_("Operation was cancelled"));
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending must be false on entry
+ * output_pending must be PENDING_NONE on entry
*/
static void
-maybe_write_next_message (GDBusWorker *worker)
+continue_writing (GDBusWorker *worker)
{
MessageToWriteData *data;
+ FlushAsyncData *flush_async_data;
write_next:
/* we mustn't try to write two things at once */
- g_assert (!worker->output_pending);
+ g_assert (worker->output_pending == PENDING_NONE);
+
+ g_mutex_lock (&worker->write_lock);
- g_mutex_lock (worker->write_lock);
+ data = NULL;
+ flush_async_data = NULL;
/* if we want to close the connection, that takes precedence */
if (worker->pending_close_attempts != NULL)
{
- worker->output_pending = TRUE;
+ worker->close_expected = TRUE;
+ worker->output_pending = PENDING_CLOSE;
g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
NULL, iostream_close_cb,
_g_dbus_worker_ref (worker));
- data = NULL;
}
else
{
- data = g_queue_pop_head (worker->write_queue);
+ flush_async_data = prepare_flush_unlocked (worker);
- if (data != NULL)
- worker->output_pending = TRUE;
+ if (flush_async_data == NULL)
+ {
+ data = g_queue_pop_head (worker->write_queue);
+
+ if (data != NULL)
+ worker->output_pending = PENDING_WRITE;
+ }
}
- g_mutex_unlock (worker->write_lock);
+ g_mutex_unlock (&worker->write_lock);
/* Note that write_lock is only used for protecting the @write_queue
* and @output_pending fields of the GDBusWorker struct ... which we
* code and then writing the message out onto the GIOStream since this
* function only runs on the worker thread.
*/
- if (data != NULL)
+
+ if (flush_async_data != NULL)
+ {
+ start_flush (flush_async_data);
+ g_assert (data == NULL);
+ }
+ else if (data != NULL)
{
GDBusMessage *old_message;
guchar *new_blob;
else if (data->message == NULL)
{
/* filters dropped message */
- g_mutex_lock (worker->write_lock);
- worker->output_pending = FALSE;
- g_mutex_unlock (worker->write_lock);
+ g_mutex_lock (&worker->write_lock);
+ worker->output_pending = PENDING_NONE;
+ g_mutex_unlock (&worker->write_lock);
message_to_write_data_free (data);
goto write_next;
}
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
*/
static gboolean
-write_message_in_idle_cb (gpointer user_data)
+continue_writing_in_idle_cb (gpointer user_data)
{
GDBusWorker *worker = user_data;
/* Because this is the worker thread, we can read this struct member
* without holding the lock: no other thread ever modifies it.
*/
- if (!worker->output_pending)
- maybe_write_next_message (worker);
+ if (worker->output_pending == PENDING_NONE)
+ continue_writing (worker);
return FALSE;
}
/*
* @write_data: (transfer full) (allow-none):
+ * @flush_data: (transfer full) (allow-none):
* @close_data: (transfer full) (allow-none):
*
* Can be called from any thread
*
- * write_lock is not held on entry
- * output_pending may be true or false
+ * write_lock is held on entry
+ * output_pending may be anything
*/
static void
-schedule_write_in_worker_thread (GDBusWorker *worker,
- MessageToWriteData *write_data,
- CloseData *close_data)
+schedule_writing_unlocked (GDBusWorker *worker,
+ MessageToWriteData *write_data,
+ FlushData *flush_data,
+ CloseData *close_data)
{
- g_mutex_lock (worker->write_lock);
-
if (write_data != NULL)
g_queue_push_tail (worker->write_queue, write_data);
+ if (flush_data != NULL)
+ worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
+
if (close_data != NULL)
worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
close_data);
- if (!worker->output_pending)
+ /* If we had output pending, the next bit of output will happen
+ * automatically when it finishes, so we only need to do this
+ * if nothing was pending.
+ *
+ * The idle callback will re-check that output_pending is still
+ * PENDING_NONE, to guard against output starting before the idle.
+ */
+ if (worker->output_pending == PENDING_NONE)
{
GSource *idle_source;
idle_source = g_idle_source_new ();
g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
g_source_set_callback (idle_source,
- write_message_in_idle_cb,
+ continue_writing_in_idle_cb,
_g_dbus_worker_ref (worker),
(GDestroyNotify) _g_dbus_worker_unref);
+ g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
g_source_attach (idle_source, worker->shared_thread_data->context);
g_source_unref (idle_source);
}
-
- g_mutex_unlock (worker->write_lock);
}
/* ---------------------------------------------------------------------------------------------------- */
/* can be called from any thread - steals blob
*
* write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
*/
void
_g_dbus_worker_send_message (GDBusWorker *worker,
data->blob = blob; /* steal! */
data->blob_size = blob_len;
- schedule_write_in_worker_thread (worker, data, NULL);
+ g_mutex_lock (&worker->write_lock);
+ schedule_writing_unlocked (worker, data, NULL, NULL);
+ g_mutex_unlock (&worker->write_lock);
}
/* ---------------------------------------------------------------------------------------------------- */
worker = g_new0 (GDBusWorker, 1);
worker->ref_count = 1;
- worker->read_lock = g_mutex_new ();
+ g_mutex_init (&worker->read_lock);
worker->message_received_callback = message_received_callback;
worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
worker->disconnected_callback = disconnected_callback;
worker->stream = g_object_ref (stream);
worker->capabilities = capabilities;
worker->cancellable = g_cancellable_new ();
- worker->output_pending = FALSE;
+ worker->output_pending = PENDING_NONE;
worker->frozen = initially_frozen;
worker->received_messages_while_frozen = g_queue_new ();
- worker->write_lock = g_mutex_new ();
+ g_mutex_init (&worker->write_lock);
worker->write_queue = g_queue_new ();
if (G_IS_SOCKET_CONNECTION (worker->stream))
g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
g_source_set_callback (idle_source,
_g_dbus_worker_do_initial_read,
- worker,
- NULL);
+ _g_dbus_worker_ref (worker),
+ (GDestroyNotify) _g_dbus_worker_unref);
+ g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
g_source_attach (idle_source, worker->shared_thread_data->context);
g_source_unref (idle_source);
/* can be called from any thread
*
* write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
*/
void
_g_dbus_worker_close (GDBusWorker *worker,
(cancellable == NULL ? NULL : g_object_ref (cancellable));
close_data->result = (result == NULL ? NULL : g_object_ref (result));
+ /* Don't set worker->close_expected here - we're in the wrong thread.
+ * It'll be set before the actual close happens.
+ */
g_cancellable_cancel (worker->cancellable);
- schedule_write_in_worker_thread (worker, NULL, close_data);
+ g_mutex_lock (&worker->write_lock);
+ schedule_writing_unlocked (worker, NULL, NULL, close_data);
+ g_mutex_unlock (&worker->write_lock);
}
/* This can be called from any thread - frees worker. Note that
* worker - use your own synchronization primitive in the callbacks.
*
* write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
*/
void
_g_dbus_worker_stop (GDBusWorker *worker)
{
- worker->stopped = TRUE;
+ g_atomic_int_set (&worker->stopped, TRUE);
/* Cancel any pending operations and schedule a close of the underlying I/O
* stream in the worker thread
*/
_g_dbus_worker_close (worker, NULL, NULL);
- /* _g_dbus_worker_close holds a ref until after an idle in the the worker
+ /* _g_dbus_worker_close holds a ref until after an idle in the worker
* thread has run, so we no longer need to unref in an idle like in
* commit 322e25b535
*/
* the transport has been flushed
*
* write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
*/
gboolean
_g_dbus_worker_flush_sync (GDBusWorker *worker,
{
gboolean ret;
FlushData *data;
+ guint64 pending_writes;
data = NULL;
ret = TRUE;
- /* if the queue is empty, there's nothing to wait for */
- g_mutex_lock (worker->write_lock);
- if (g_queue_get_length (worker->write_queue) > 0)
+ g_mutex_lock (&worker->write_lock);
+
+ /* if the queue is empty, no write is in-flight and we haven't written
+ * anything since the last flush, then there's nothing to wait for
+ */
+ pending_writes = g_queue_get_length (worker->write_queue);
+
+ /* if a write is in-flight, we shouldn't be satisfied until the first
+ * flush operation that follows it
+ */
+ if (worker->output_pending == PENDING_WRITE)
+ pending_writes += 1;
+
+ if (pending_writes > 0 ||
+ worker->write_num_messages_written != worker->write_num_messages_flushed)
{
data = g_new0 (FlushData, 1);
- data->mutex = g_mutex_new ();
- data->cond = g_cond_new ();
- data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
- g_mutex_lock (data->mutex);
- worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
+ g_mutex_init (&data->mutex);
+ g_cond_init (&data->cond);
+ data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
+ g_mutex_lock (&data->mutex);
+
+ schedule_writing_unlocked (worker, NULL, data, NULL);
}
- g_mutex_unlock (worker->write_lock);
+ g_mutex_unlock (&worker->write_lock);
if (data != NULL)
{
- g_cond_wait (data->cond, data->mutex);
- g_mutex_unlock (data->mutex);
+ g_cond_wait (&data->cond, &data->mutex);
+ g_mutex_unlock (&data->mutex);
/* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
- g_cond_free (data->cond);
- g_mutex_free (data->mutex);
+ g_cond_clear (&data->cond);
+ g_mutex_clear (&data->mutex);
if (data->error != NULL)
{
ret = FALSE;
gchar *
_g_dbus_get_machine_id (GError **error)
{
+#ifdef G_OS_WIN32
+ HW_PROFILE_INFOA info;
+ char *src, *dest, *res;
+ int i;
+
+ if (!GetCurrentHwProfileA (&info))
+ {
+ char *message = g_win32_error_message (GetLastError ());
+ g_set_error (error,
+ G_IO_ERROR,
+ G_IO_ERROR_FAILED,
+ _("Unable to get Hardware profile: %s"), message);
+ g_free (message);
+ return NULL;
+ }
+
+ /* Form: {12340001-4980-1920-6788-123456789012} */
+ src = &info.szHwProfileGuid[0];
+
+ res = g_malloc (32+1);
+ dest = res;
+
+ src++; /* Skip { */
+ for (i = 0; i < 8; i++)
+ *dest++ = *src++;
+ src++; /* Skip - */
+ for (i = 0; i < 4; i++)
+ *dest++ = *src++;
+ src++; /* Skip - */
+ for (i = 0; i < 4; i++)
+ *dest++ = *src++;
+ src++; /* Skip - */
+ for (i = 0; i < 4; i++)
+ *dest++ = *src++;
+ src++; /* Skip - */
+ for (i = 0; i < 12; i++)
+ *dest++ = *src++;
+ *dest = 0;
+
+ return res;
+#else
gchar *ret;
+ GError *first_error;
/* TODO: use PACKAGE_LOCALSTATEDIR ? */
ret = NULL;
+ first_error = NULL;
if (!g_file_get_contents ("/var/lib/dbus/machine-id",
&ret,
NULL,
- error))
+ &first_error) &&
+ !g_file_get_contents ("/etc/machine-id",
+ &ret,
+ NULL,
+ NULL))
{
- g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
+ g_propagate_prefixed_error (error, first_error,
+ _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
}
else
{
+ /* ignore the error from the first try, if any */
+ g_clear_error (&first_error);
/* TODO: validate value */
g_strstrip (ret);
}
return ret;
+#endif
}
/* ---------------------------------------------------------------------------------------------------- */
_g_dbus_debug_print_lock ();
g_print ("========================================================================\n"
"GDBus-debug:Transport:\n"
- " >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
+ " >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
" size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
bytes_written,
g_dbus_message_get_serial (data->message),
_g_dbus_debug_print_lock ();
g_print ("========================================================================\n"
"GDBus-debug:Transport:\n"
- " <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
+ " <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
" size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
bytes_read,
serial,