#include "gasyncresult.h"
#include "gsimpleasyncresult.h"
#include "ginputstream.h"
+#include "gmemoryinputstream.h"
#include "giostream.h"
#include "gsocketcontrolmessage.h"
#include "gsocketconnection.h"
else
{
g_assert (error != NULL);
- g_simple_async_result_set_from_error (data->simple, error);
- g_error_free (error);
+ g_simple_async_result_take_error (data->simple, error);
}
if (data->from_mainloop)
/* ---------------------------------------------------------------------------------------------------- */
+/* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
+
+static GPtrArray *ensured_classes = NULL;
+
+static void
+ensure_type (GType gtype)
+{
+ g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
+}
+
+static void
+released_required_types (void)
+{
+ g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
+ g_ptr_array_unref (ensured_classes);
+ ensured_classes = NULL;
+}
+
+static void
+ensure_required_types (void)
+{
+ g_assert (ensured_classes == NULL);
+ ensured_classes = g_ptr_array_new ();
+ ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT);
+ ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
+}
+/* ---------------------------------------------------------------------------------------------------- */
+
G_LOCK_DEFINE_STATIC (shared_thread_lock);
typedef struct
static SharedThreadData *shared_thread_data = NULL;
static gpointer
-shared_thread_func (gpointer data)
+gdbus_shared_thread_func (gpointer data)
{
g_main_context_push_thread_default (shared_thread_data->context);
g_main_loop_run (shared_thread_data->loop);
return FALSE;
}
+/* ---------------------------------------------------------------------------------------------------- */
+
static void
_g_dbus_shared_thread_ref (GDBusSharedThreadFunc func,
gpointer user_data)
GError *error;
GSource *idle_source;
CallerData *data;
+ gboolean release_types;
G_LOCK (shared_thread_lock);
+ release_types = FALSE;
+
if (shared_thread_data != NULL)
{
shared_thread_data->num_users += 1;
shared_thread_data = g_new0 (SharedThreadData, 1);
shared_thread_data->num_users = 1;
+ /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
+ ensure_required_types ();
+ release_types = TRUE;
+
error = NULL;
shared_thread_data->context = g_main_context_new ();
shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE);
- shared_thread_data->thread = g_thread_create (shared_thread_func,
+ shared_thread_data->thread = g_thread_create (gdbus_shared_thread_func,
NULL,
TRUE,
&error);
while (!data->done)
g_thread_yield ();
+ if (release_types)
+ released_required_types ();
+
g_free (data);
G_UNLOCK (shared_thread_lock);
gint num_writes_pending;
guint64 write_num_messages_written;
GList *write_pending_flushes;
+ gboolean flush_pending;
};
/* ---------------------------------------------------------------------------------------------------- */
worker->message_received_callback (worker, message, worker->user_data);
}
-static gboolean
+static GDBusMessage *
_g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker,
GDBusMessage *message)
{
- gboolean ret;
- ret = FALSE;
+ GDBusMessage *ret;
if (!worker->stopped)
ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
+ else
+ ret = message;
return ret;
}
if (worker->read_fd_list != NULL)
{
g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
+ g_object_unref (worker->read_fd_list);
worker->read_fd_list = NULL;
}
#endif
message_to_write_data_free (MessageToWriteData *data)
{
_g_dbus_worker_unref (data->worker);
- g_object_unref (data->message);
+ if (data->message)
+ g_object_unref (data->message);
g_free (data->blob);
g_free (data);
}
&error);
if (bytes_written == -1)
{
- g_simple_async_result_set_from_error (simple, error);
- g_error_free (error);
+ g_simple_async_result_take_error (simple, error);
g_simple_async_result_complete (simple);
g_object_unref (simple);
goto out;
if (bytes_written == -1)
{
/* Handle WOULD_BLOCK by waiting until there's room in the buffer */
- if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_WOULD_BLOCK)
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
{
GSource *source;
source = g_socket_create_source (data->worker->socket,
NULL); /* GDestroyNotify */
g_source_attach (source, g_main_context_get_thread_default ());
g_source_unref (source);
+ g_error_free (error);
goto out;
}
- g_simple_async_result_set_from_error (simple, error);
- g_error_free (error);
+ g_simple_async_result_take_error (simple, error);
g_simple_async_result_complete (simple);
g_object_unref (simple);
goto out;
if (error != NULL)
g_error_free (error);
+ /* Make sure we tell folks that we don't have additional
+ flushes pending */
+ g_mutex_lock (data->worker->write_lock);
+ data->worker->flush_pending = FALSE;
+ g_mutex_unlock (data->worker->write_lock);
+
/* OK, cool, finally kick off the next write */
maybe_write_next_message (data->worker);
worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
}
}
+ if (flushers != NULL)
+ {
+ worker->flush_pending = TRUE;
+ }
g_mutex_unlock (worker->write_lock);
if (flushers != NULL)
*/
if (data != NULL)
{
- gboolean message_was_dropped;
- message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
- if (G_UNLIKELY (message_was_dropped))
+ GDBusMessage *old_message;
+ guchar *new_blob;
+ gsize new_blob_size;
+ GError *error;
+
+ old_message = data->message;
+ data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
+ if (data->message == old_message)
+ {
+ /* filters had no effect - do nothing */
+ }
+ else if (data->message == NULL)
{
+ /* filters dropped message */
g_mutex_lock (worker->write_lock);
worker->num_writes_pending -= 1;
g_mutex_unlock (worker->write_lock);
}
else
{
- write_message_async (worker,
- data,
- write_message_cb,
- data);
+ /* filters altered the message -> reencode */
+ error = NULL;
+ new_blob = g_dbus_message_to_blob (data->message,
+ &new_blob_size,
+ worker->capabilities,
+ &error);
+ if (new_blob == NULL)
+ {
+ /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
+ * the old message instead
+ */
+ g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
+ g_dbus_message_get_serial (data->message),
+ error->message);
+ g_error_free (error);
+ }
+ else
+ {
+ g_free (data->blob);
+ data->blob = (gchar *) new_blob;
+ data->blob_size = new_blob_size;
+ }
}
+
+ write_message_async (worker,
+ data,
+ write_message_cb,
+ data);
}
}
write_message_in_idle_cb (gpointer user_data)
{
GDBusWorker *worker = user_data;
- if (worker->num_writes_pending == 0)
+ if (worker->num_writes_pending == 0 && !worker->flush_pending)
maybe_write_next_message (worker);
return FALSE;
}
worker->stream = g_object_ref (stream);
worker->capabilities = capabilities;
worker->cancellable = g_cancellable_new ();
+ worker->flush_pending = FALSE;
worker->frozen = initially_frozen;
worker->received_messages_while_frozen = g_queue_new ();
/* ---------------------------------------------------------------------------------------------------- */
-/* This can be called from any thread - frees worker - guarantees no callbacks
- * will ever be issued again
+/* This can be called from any thread - frees worker. Note that
+ * callbacks might still happen if called from another thread than the
+ * worker - use your own synchronization primitive in the callbacks.
*/
void
_g_dbus_worker_stop (GDBusWorker *worker)
{
- /* If we're called in the worker thread it means we are called from
- * a worker callback. This is fine, we just can't lock in that case since
- * we're already holding the lock...
- */
- if (g_thread_self () != worker->thread)
- g_mutex_lock (worker->read_lock);
worker->stopped = TRUE;
- if (g_thread_self () != worker->thread)
- g_mutex_unlock (worker->read_lock);
-
g_cancellable_cancel (worker->cancellable);
_g_dbus_worker_unref (worker);
}
const gchar *debug;
g_dbus_error_domain = G_DBUS_ERROR;
+ (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
debug = g_getenv ("G_DBUS_DEBUG");
if (debug != NULL)