/* ---------------------------------------------------------------------------------------------------- */
+typedef enum {
+ PENDING_NONE = 0,
+ PENDING_WRITE,
+ PENDING_FLUSH,
+ PENDING_CLOSE
+} OutputPending;
+
struct GDBusWorker
{
volatile gint ref_count;
GSocketControlMessage **read_ancillary_messages;
gint read_num_ancillary_messages;
- /* TRUE if an async write, flush or close is pending.
+ /* Whether an async write, flush or close, or none of those, is pending.
* Only the worker thread may change its value, and only with the write_lock.
* Other threads may read its value when holding the write_lock.
* The worker thread may read its value at any time.
*/
- gboolean output_pending;
+ OutputPending output_pending;
/* used for writing */
GMutex write_lock;
/* queue of MessageToWriteData, protected by write_lock */
GList *write_pending_flushes;
/* list of CloseData, protected by write_lock */
GList *pending_close_attempts;
+ /* no lock - only used from the worker thread */
+ gboolean close_expected;
};
static void _g_dbus_worker_unref (GDBusWorker *worker);
if (bytes_read == -1)
{
- _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+ if (G_UNLIKELY (_g_dbus_debug_transport ()))
+ {
+ _g_dbus_debug_print_lock ();
+ g_print ("========================================================================\n"
+ "GDBus-debug:Transport:\n"
+ " ---- READ ERROR on stream of type %s:\n"
+ " ---- %s %d: %s\n",
+ g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
+ g_quark_to_string (error->domain), error->code,
+ error->message);
+ _g_dbus_debug_print_unlock ();
+ }
+
+ /* Every async read that uses this callback uses worker->cancellable
+ * as its GCancellable. worker->cancellable gets cancelled if and only
+ * if the GDBusConnection tells us to close (either via
+ * _g_dbus_worker_stop, which is called on last-unref, or directly),
+ * so a cancelled read must mean our connection was closed locally.
+ *
+ * If we're closing, other errors are possible - notably,
+ * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
+ * read in-flight. It seems sensible to treat all read errors during
+ * closing as an expected thing that doesn't trip exit-on-close.
+ *
+ * Because close_expected can't be set until we get into the worker
+ * thread, but the cancellable is signalled sooner (from another
+ * thread), we do still need to check the error.
+ */
+ if (worker->close_expected ||
+ g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+ _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
+ else
+ _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+
g_error_free (error);
goto out;
}
static void
_g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
{
+ /* Note that we do need to keep trying to read even if close_expected is
+ * true, because only failing a read causes us to signal 'closed'.
+ */
+
/* if bytes_wanted is zero, it means start reading a message */
if (worker->read_buffer_bytes_wanted == 0)
{
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
*/
static void
write_message_async_cb (GObject *source_object,
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
*/
static gboolean
on_socket_ready (GSocket *socket,
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
*/
static void
write_message_continue_writing (MessageToWriteData *data)
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
*/
static void
write_message_async (GDBusWorker *worker,
}
/* ---------------------------------------------------------------------------------------------------- */
-static void maybe_write_next_message (GDBusWorker *worker);
+static void continue_writing (GDBusWorker *worker);
typedef struct
{
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_FLUSH on entry
*/
static void
ostream_flush_cb (GObject *source_object,
/* Make sure we tell folks that we don't have additional
flushes pending */
g_mutex_lock (&data->worker->write_lock);
- g_assert (data->worker->output_pending);
- data->worker->output_pending = FALSE;
+ g_assert (data->worker->output_pending == PENDING_FLUSH);
+ data->worker->output_pending = PENDING_NONE;
g_mutex_unlock (&data->worker->write_lock);
/* OK, cool, finally kick off the next write */
- maybe_write_next_message (data->worker);
+ continue_writing (data->worker);
_g_dbus_worker_unref (data->worker);
g_free (data);
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is false on entry
+ * output_pending is PENDING_NONE on entry
*/
static void
message_written (GDBusWorker *worker,
}
if (flushers != NULL)
{
- g_assert (!worker->output_pending);
- worker->output_pending = TRUE;
+ g_assert (worker->output_pending == PENDING_NONE);
+ worker->output_pending = PENDING_FLUSH;
}
g_mutex_unlock (&worker->write_lock);
else
{
/* kick off the next write! */
- maybe_write_next_message (worker);
+ continue_writing (worker);
}
}
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_WRITE on entry
*/
static void
write_message_cb (GObject *source_object,
GError *error;
g_mutex_lock (&data->worker->write_lock);
- g_assert (data->worker->output_pending);
- data->worker->output_pending = FALSE;
+ g_assert (data->worker->output_pending == PENDING_WRITE);
+ data->worker->output_pending = PENDING_NONE;
g_mutex_unlock (&data->worker->write_lock);
error = NULL;
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending is true on entry
+ * output_pending is PENDING_CLOSE on entry
*/
static void
iostream_close_cb (GObject *source_object,
send_queue = worker->write_queue;
worker->write_queue = g_queue_new ();
- g_assert (worker->output_pending);
- worker->output_pending = FALSE;
+ g_assert (worker->output_pending == PENDING_CLOSE);
+ worker->output_pending = PENDING_NONE;
g_mutex_unlock (&worker->write_lock);
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending must be false on entry
+ * output_pending must be PENDING_NONE on entry
*/
static void
-maybe_write_next_message (GDBusWorker *worker)
+continue_writing (GDBusWorker *worker)
{
MessageToWriteData *data;
write_next:
/* we mustn't try to write two things at once */
- g_assert (!worker->output_pending);
+ g_assert (worker->output_pending == PENDING_NONE);
g_mutex_lock (&worker->write_lock);
/* if we want to close the connection, that takes precedence */
if (worker->pending_close_attempts != NULL)
{
- worker->output_pending = TRUE;
+ worker->close_expected = TRUE;
+ worker->output_pending = PENDING_CLOSE;
g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
NULL, iostream_close_cb,
data = g_queue_pop_head (worker->write_queue);
if (data != NULL)
- worker->output_pending = TRUE;
+ worker->output_pending = PENDING_WRITE;
}
g_mutex_unlock (&worker->write_lock);
{
/* filters dropped message */
g_mutex_lock (&worker->write_lock);
- worker->output_pending = FALSE;
+ worker->output_pending = PENDING_NONE;
g_mutex_unlock (&worker->write_lock);
message_to_write_data_free (data);
goto write_next;
/* called in private thread shared by all GDBusConnection instances
*
* write-lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
*/
static gboolean
-write_message_in_idle_cb (gpointer user_data)
+continue_writing_in_idle_cb (gpointer user_data)
{
GDBusWorker *worker = user_data;
/* Because this is the worker thread, we can read this struct member
* without holding the lock: no other thread ever modifies it.
*/
- if (!worker->output_pending)
- maybe_write_next_message (worker);
+ if (worker->output_pending == PENDING_NONE)
+ continue_writing (worker);
return FALSE;
}
*
* Can be called from any thread
*
- * write_lock is not held on entry
- * output_pending may be true or false
+ * write_lock is held on entry
+ * output_pending may be anything
*/
static void
-schedule_write_in_worker_thread (GDBusWorker *worker,
- MessageToWriteData *write_data,
- CloseData *close_data)
+schedule_writing_unlocked (GDBusWorker *worker,
+ MessageToWriteData *write_data,
+ CloseData *close_data)
{
- g_mutex_lock (&worker->write_lock);
-
if (write_data != NULL)
g_queue_push_tail (worker->write_queue, write_data);
worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
close_data);
- if (!worker->output_pending)
+ if (worker->output_pending == PENDING_NONE)
{
GSource *idle_source;
idle_source = g_idle_source_new ();
g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
g_source_set_callback (idle_source,
- write_message_in_idle_cb,
+ continue_writing_in_idle_cb,
_g_dbus_worker_ref (worker),
(GDestroyNotify) _g_dbus_worker_unref);
g_source_attach (idle_source, worker->shared_thread_data->context);
g_source_unref (idle_source);
}
-
- g_mutex_unlock (&worker->write_lock);
}
/* ---------------------------------------------------------------------------------------------------- */
/* can be called from any thread - steals blob
*
* write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
*/
void
_g_dbus_worker_send_message (GDBusWorker *worker,
data->blob = blob; /* steal! */
data->blob_size = blob_len;
- schedule_write_in_worker_thread (worker, data, NULL);
+ g_mutex_lock (&worker->write_lock);
+ schedule_writing_unlocked (worker, data, NULL);
+ g_mutex_unlock (&worker->write_lock);
}
/* ---------------------------------------------------------------------------------------------------- */
worker->stream = g_object_ref (stream);
worker->capabilities = capabilities;
worker->cancellable = g_cancellable_new ();
- worker->output_pending = FALSE;
+ worker->output_pending = PENDING_NONE;
worker->frozen = initially_frozen;
worker->received_messages_while_frozen = g_queue_new ();
/* can be called from any thread
*
* write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
*/
void
_g_dbus_worker_close (GDBusWorker *worker,
(cancellable == NULL ? NULL : g_object_ref (cancellable));
close_data->result = (result == NULL ? NULL : g_object_ref (result));
+ /* Don't set worker->close_expected here - we're in the wrong thread.
+ * It'll be set before the actual close happens.
+ */
g_cancellable_cancel (worker->cancellable);
- schedule_write_in_worker_thread (worker, NULL, close_data);
+ g_mutex_lock (&worker->write_lock);
+ schedule_writing_unlocked (worker, NULL, close_data);
+ g_mutex_unlock (&worker->write_lock);
}
/* This can be called from any thread - frees worker. Note that
* worker - use your own synchronization primitive in the callbacks.
*
* write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
*/
void
_g_dbus_worker_stop (GDBusWorker *worker)
* the transport has been flushed
*
* write_lock is not held on entry
- * output_pending may be true or false
+ * output_pending may be anything
*/
gboolean
_g_dbus_worker_flush_sync (GDBusWorker *worker,
_g_dbus_get_machine_id (GError **error)
{
gchar *ret;
+ GError *first_error;
/* TODO: use PACKAGE_LOCALSTATEDIR ? */
ret = NULL;
+ first_error = NULL;
if (!g_file_get_contents ("/var/lib/dbus/machine-id",
&ret,
NULL,
- error))
+ &first_error) &&
+ !g_file_get_contents ("/etc/machine-id",
+ &ret,
+ NULL,
+ NULL))
{
- g_prefix_error (error, _("Unable to load /var/lib/dbus/machine-id: "));
+ g_propagate_prefixed_error (error, first_error,
+ _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
}
else
{
+ /* ignore the error from the first try, if any */
+ g_clear_error (&first_error);
/* TODO: validate value */
g_strstrip (ret);
}