X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gio%2Fgdbusprivate.c;h=27d63bc38a8761443965dc5d651bc7ace2842f3c;hb=d4c39ffb96567b7182e8f4df1aea35320285bc72;hp=762e676c50ff048fa3a4ffc2fdc12b86b05aa562;hpb=078dbda148a81af1b3a76fbda72f089b963087f1;p=platform%2Fupstream%2Fglib.git diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c index 762e676..27d63bc 100644 --- a/gio/gdbusprivate.c +++ b/gio/gdbusprivate.c @@ -40,6 +40,8 @@ #include "gsocketoutputstream.h" #ifdef G_OS_UNIX +#include "gkdbus.h" +#include "gkdbusconnection.h" #include "gunixfdmessage.h" #include "gunixconnection.h" #include "gunixcredentialsmessage.h" @@ -90,6 +92,107 @@ _g_dbus_hexdump (const gchar *data, gsize len, guint indent) /* ---------------------------------------------------------------------------------------------------- */ +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) +typedef struct +{ + GKdbus *kdbus; + GCancellable *cancellable; + + GSimpleAsyncResult *simple; + + gboolean from_mainloop; +} ReadKdbusData; + +static void +read_kdbus_data_free (ReadKdbusData *data) +{ + g_object_unref (data->kdbus); + if (data->cancellable != NULL) + g_object_unref (data->cancellable); + g_object_unref (data->simple); + g_free (data); +} + +static gboolean +_g_kdbus_read_ready (GKdbus *kdbus, + GIOCondition condition, + gpointer user_data) +{ + ReadKdbusData *data = user_data; + GError *error = NULL; + gssize result; + + result = _g_kdbus_receive (data->kdbus, + data->cancellable, + &error); + + if (result >= 0) + { + g_simple_async_result_set_op_res_gssize (data->simple, result); + } + else + { + g_assert (error != NULL); + g_simple_async_result_take_error (data->simple, error); + } + + if (data->from_mainloop) + g_simple_async_result_complete (data->simple); + else + g_simple_async_result_complete_in_idle (data->simple); + + return FALSE; +} + +static void +_g_kdbus_read (GKdbus *kdbus, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + ReadKdbusData *data; + GSource *source; + + data = g_new0 (ReadKdbusData, 1); + data->kdbus = g_object_ref (kdbus); + data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL; + + data->simple = g_simple_async_result_new (G_OBJECT (kdbus), + callback, + user_data, + _g_kdbus_read); + g_simple_async_result_set_check_cancellable (data->simple, cancellable); + + data->from_mainloop = TRUE; + source = _g_kdbus_create_source (data->kdbus, + G_IO_IN, + cancellable); + g_source_set_callback (source, + (GSourceFunc) _g_kdbus_read_ready, + data, + (GDestroyNotify) read_kdbus_data_free); + g_source_attach (source, g_main_context_get_thread_default ()); + g_source_unref (source); +} + +static gssize +_g_kdbus_read_finish (GKdbus *kdbus, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); + + g_return_val_if_fail (G_IS_KDBUS (kdbus), -1); + g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read); + + if (g_simple_async_result_propagate_error (simple, error)) + return -1; + else + return g_simple_async_result_get_op_res_gssize (simple); +} + +#endif /* defined (G_OS_UNIX) && (KDBUS_TRANSPORT) */ + /* Unfortunately ancillary messages are discarded when reading from a * socket using the GSocketInputStream abstraction. So we provide a * very GInputStream-ish API that uses GSocket in this case (very @@ -359,8 +462,11 @@ struct GDBusWorker GDBusWorkerDisconnectedCallback disconnected_callback; gpointer user_data; - /* if not NULL, stream is GSocketConnection */ + /* if GSocket and GKdbus are NULL, stream is GSocketConnection */ GSocket *socket; +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + GKdbus *kdbus; +#endif /* used for reading */ GMutex read_lock; @@ -371,6 +477,10 @@ struct GDBusWorker GUnixFDList *read_fd_list; GSocketControlMessage **read_ancillary_messages; gint read_num_ancillary_messages; +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + GSList *read_kdbus_msg_items; +#endif + /* Whether an async write, flush or close, or none of those, is pending. * Only the worker thread may change its value, and only with the write_lock. @@ -554,6 +664,7 @@ _g_dbus_worker_unfreeze (GDBusWorker *worker) unfreeze_in_idle_cb, _g_dbus_worker_ref (worker), (GDestroyNotify) _g_dbus_worker_unref); + g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb"); g_source_attach (idle_source, worker->shared_thread_data->context); g_source_unref (idle_source); } @@ -579,7 +690,29 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, goto out; error = NULL; - if (worker->socket == NULL) + bytes_read = 0; + + if (FALSE) + { + } +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + else if (G_IS_KDBUS_CONNECTION (worker->stream)) + { + bytes_read = _g_kdbus_read_finish (worker->kdbus, + res, + &error); + + /* [KDBUS] Get all received items*/ + worker->read_kdbus_msg_items = _g_kdbus_get_last_msg_items (worker->kdbus); + + /* [KDBUS] Attach fds (if any) to worker->read_fd_list */ + _g_kdbus_attach_fds_to_msg (worker->kdbus, &worker->read_fd_list); + + /* [KDBUS] For KDBUS transport we read whole message at once*/ + worker->read_buffer_bytes_wanted = bytes_read; + } +#endif + else if (worker->socket == NULL) bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream), res, &error); @@ -658,9 +791,8 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, _g_dbus_debug_print_lock (); g_print ("========================================================================\n" "GDBus-debug:Transport:\n" - " ---- READ ERROR on stream of type %s:\n" + " ---- READ ERROR:\n" " ---- %s %d: %s\n", - g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))), g_quark_to_string (error->domain), error->code, error->message); _g_dbus_debug_print_unlock (); @@ -714,7 +846,9 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, goto out; } - read_message_print_transport_debug (bytes_read, worker); + /* [KDBUS] don't print transport dbus debug for kdbus connection */ + if (!G_IS_KDBUS_CONNECTION (worker->stream)) + read_message_print_transport_debug (bytes_read, worker); worker->read_buffer_cur_size += bytes_read; if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size) @@ -746,25 +880,61 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, /* TODO: use connection->priv->auth to decode the message */ - message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer, - worker->read_buffer_cur_size, - worker->capabilities, - &error); - if (message == NULL) + if (FALSE) { - gchar *s; - s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2); - g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n" - "The error is: %s\n" - "The payload is as follows:\n" - "%s\n", - worker->read_buffer_cur_size, - error->message, - s); - g_free (s); - _g_dbus_worker_emit_disconnected (worker, FALSE, error); - g_error_free (error); - goto out; + } +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + else if (G_IS_KDBUS_CONNECTION (worker->stream)) + { + GDBusMessageType message_type; + gchar *sender; + gchar *destination; + + message = _g_dbus_message_new_from_kdbus_items (worker->read_kdbus_msg_items, + &error); + + /* [KDBUS] override informations from the user header with kernel msg header */ + sender = _g_kdbus_get_last_msg_sender (worker->kdbus); + g_dbus_message_set_sender (message, sender); + + message_type = g_dbus_message_get_message_type (message); + if (message_type == G_DBUS_MESSAGE_TYPE_SIGNAL) + { + destination = _g_kdbus_get_last_msg_destination (worker->kdbus); + g_dbus_message_set_destination (message, destination); + } + + if (message == NULL) + { + g_warning ("Error decoding D-Bus (kdbus) message\n"); + g_error_free (error); + goto out; + } + } +#endif + else + { + message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer, + worker->read_buffer_cur_size, + worker->capabilities, + &error); + + if (message == NULL) + { + gchar *s; + s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2); + g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n" + "The error is: %s\n" + "The payload is as follows:\n" + "%s\n", + worker->read_buffer_cur_size, + error->message, + s); + g_free (s); + _g_dbus_worker_emit_disconnected (worker, FALSE, error); + g_error_free (error); + goto out; + } } #ifdef G_OS_UNIX @@ -789,7 +959,15 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, g_free (s); if (G_UNLIKELY (_g_dbus_debug_payload ())) { - s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2); + if (FALSE) + { + } +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + else if (G_IS_KDBUS_CONNECTION (worker->stream)) + s = _g_kdbus_hexdump_all_items (worker->read_kdbus_msg_items); +#endif + else + s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2); g_print ("%s\n", s); g_free (s); } @@ -812,6 +990,20 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, } out: + +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + /* [KDBUS] release memory occupied by kdbus message */ + if (G_IS_KDBUS_CONNECTION (worker->stream)) + { + if (!_g_kdbus_is_closed (worker->kdbus)) + { + _g_kdbus_release_kmsg (worker->kdbus); + worker->read_kdbus_msg_items = NULL; + } + worker->read_buffer = NULL; + } +#endif + g_mutex_unlock (&worker->read_lock); /* gives up the reference acquired when calling g_input_stream_read_async() */ @@ -826,6 +1018,24 @@ _g_dbus_worker_do_read_unlocked (GDBusWorker *worker) * true, because only failing a read causes us to signal 'closed'. */ + /* [KDBUS] + * For KDBUS transport we don't have to alloc buffer (worker->read_buffer) + * instead of it we use kdbus memory pool. On connection stage KDBUS client + * have to register a memory pool, large enough to carry all backlog of + * data enqueued for the connection. + */ + +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + if (G_IS_KDBUS_CONNECTION (worker->stream)) + { + _g_kdbus_read(worker->kdbus, + worker->cancellable, + (GAsyncReadyCallback) _g_dbus_worker_do_read_cb, + _g_dbus_worker_ref (worker)); + return; + } +#endif + /* if bytes_wanted is zero, it means start reading a message */ if (worker->read_buffer_bytes_wanted == 0) { @@ -980,22 +1190,39 @@ static void write_message_continue_writing (MessageToWriteData *data) { GOutputStream *ostream; + #ifdef G_OS_UNIX GSimpleAsyncResult *simple; GUnixFDList *fd_list; -#endif -#ifdef G_OS_UNIX /* Note: we can't access data->simple after calling g_async_result_complete () because the * callback can free @data and we're not completing in idle. So use a copy of the pointer. */ simple = data->simple; -#endif - ostream = g_io_stream_get_output_stream (data->worker->stream); -#ifdef G_OS_UNIX fd_list = g_dbus_message_get_unix_fd_list (data->message); -#endif + +#ifdef KDBUS_TRANSPORT + if (G_IS_KDBUS_CONNECTION (data->worker->stream)) + { + GError *error; + error = NULL; + data->total_written = _g_kdbus_send (data->worker, + data->worker->kdbus, + data->message, + fd_list, + data->worker->cancellable, + &error); + + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; + } +#endif /* KDBUS_TRANSPORT */ + +#endif /* G_OS_UNIX */ + + ostream = g_io_stream_get_output_stream (data->worker->stream); g_assert (!g_output_stream_has_pending (ostream)); g_assert_cmpint (data->total_written, <, data->blob_size); @@ -1231,11 +1458,33 @@ ostream_flush_cb (GObject *source_object, static void start_flush (FlushAsyncData *data) { - g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream), - G_PRIORITY_DEFAULT, - data->worker->cancellable, - ostream_flush_cb, - data); + /*[KDBUS]: TODO: to investigate */ + if (G_IS_KDBUS_CONNECTION (data->worker->stream)) + { + g_assert (data->flushers != NULL); + flush_data_list_complete (data->flushers, NULL); + g_list_free (data->flushers); + + g_mutex_lock (&data->worker->write_lock); + data->worker->write_num_messages_flushed = data->worker->write_num_messages_written; + g_assert (data->worker->output_pending == PENDING_FLUSH); + data->worker->output_pending = PENDING_NONE; + g_mutex_unlock (&data->worker->write_lock); + + /* OK, cool, finally kick off the next write */ + continue_writing (data->worker); + + _g_dbus_worker_unref (data->worker); + g_free (data); + } + else + { + g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream), + G_PRIORITY_DEFAULT, + data->worker->cancellable, + ostream_flush_cb, + data); + } } /* called in private thread shared by all GDBusConnection instances @@ -1506,6 +1755,21 @@ continue_writing (GDBusWorker *worker) { /* filters altered the message -> reencode */ error = NULL; + + /* [KDBUS] + * Setting protocol version, before invoking g_dbus_message_to_blob() will + * be removed after preparing new function only for kdbus transport purposes + * (this function will be able to create blob directly/unconditionally in memfd + * object, without making copy): + * + * [1] https://code.google.com/p/d-bus/source/browse/TODO + */ + + if (G_IS_KDBUS_CONNECTION (worker->stream)) + _g_dbus_message_set_protocol_ver (data->message,2); + else + _g_dbus_message_set_protocol_ver (data->message,1); + new_blob = g_dbus_message_to_blob (data->message, &new_blob_size, worker->capabilities, @@ -1554,7 +1818,7 @@ continue_writing_in_idle_cb (gpointer user_data) return FALSE; } -/* +/** * @write_data: (transfer full) (allow-none): * @flush_data: (transfer full) (allow-none): * @close_data: (transfer full) (allow-none): @@ -1596,6 +1860,7 @@ schedule_writing_unlocked (GDBusWorker *worker, continue_writing_in_idle_cb, _g_dbus_worker_ref (worker), (GDestroyNotify) _g_dbus_worker_unref); + g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb"); g_source_attach (idle_source, worker->shared_thread_data->context); g_source_unref (idle_source); } @@ -1672,6 +1937,11 @@ _g_dbus_worker_new (GIOStream *stream, if (G_IS_SOCKET_CONNECTION (worker->stream)) worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)); +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + if (G_IS_KDBUS_CONNECTION (worker->stream)) + worker->kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream)); +#endif + worker->shared_thread_data = _g_dbus_shared_thread_ref (); /* begin reading */ @@ -1681,6 +1951,7 @@ _g_dbus_worker_new (GIOStream *stream, _g_dbus_worker_do_initial_read, _g_dbus_worker_ref (worker), (GDestroyNotify) _g_dbus_worker_unref); + g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read"); g_source_attach (idle_source, worker->shared_thread_data->context); g_source_unref (idle_source); @@ -1905,7 +2176,7 @@ _g_dbus_debug_print_unlock (void) G_UNLOCK (print_lock); } -/* +/** * _g_dbus_initialize: * * Does various one-time init things such as