#include "gsocketoutputstream.h"
#ifdef G_OS_UNIX
+#include "gkdbus.h"
+#include "gkdbusconnection.h"
#include "gunixfdmessage.h"
#include "gunixconnection.h"
#include "gunixcredentialsmessage.h"
/* ---------------------------------------------------------------------------------------------------- */
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+typedef struct
+{
+ GKdbus *kdbus;
+ GCancellable *cancellable;
+
+ GSimpleAsyncResult *simple;
+
+ gboolean from_mainloop;
+} ReadKdbusData;
+
+static void
+read_kdbus_data_free (ReadKdbusData *data)
+{
+ g_object_unref (data->kdbus);
+ if (data->cancellable != NULL)
+ g_object_unref (data->cancellable);
+ g_object_unref (data->simple);
+ g_free (data);
+}
+
+static gboolean
+_g_kdbus_read_ready (GKdbus *kdbus,
+ GIOCondition condition,
+ gpointer user_data)
+{
+ ReadKdbusData *data = user_data;
+ GError *error = NULL;
+ gssize result;
+
+ result = _g_kdbus_receive (data->kdbus,
+ data->cancellable,
+ &error);
+
+ if (result >= 0)
+ {
+ g_simple_async_result_set_op_res_gssize (data->simple, result);
+ }
+ else
+ {
+ g_assert (error != NULL);
+ g_simple_async_result_take_error (data->simple, error);
+ }
+
+ if (data->from_mainloop)
+ g_simple_async_result_complete (data->simple);
+ else
+ g_simple_async_result_complete_in_idle (data->simple);
+
+ return FALSE;
+}
+
+static void
+_g_kdbus_read (GKdbus *kdbus,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ ReadKdbusData *data;
+ GSource *source;
+
+ data = g_new0 (ReadKdbusData, 1);
+ data->kdbus = g_object_ref (kdbus);
+ data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
+
+ data->simple = g_simple_async_result_new (G_OBJECT (kdbus),
+ callback,
+ user_data,
+ _g_kdbus_read);
+ g_simple_async_result_set_check_cancellable (data->simple, cancellable);
+
+ data->from_mainloop = TRUE;
+ source = _g_kdbus_create_source (data->kdbus,
+ G_IO_IN,
+ cancellable);
+ g_source_set_callback (source,
+ (GSourceFunc) _g_kdbus_read_ready,
+ data,
+ (GDestroyNotify) read_kdbus_data_free);
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
+}
+
+static gssize
+_g_kdbus_read_finish (GKdbus *kdbus,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+
+ g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
+ g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read);
+
+ if (g_simple_async_result_propagate_error (simple, error))
+ return -1;
+ else
+ return g_simple_async_result_get_op_res_gssize (simple);
+}
+
+#endif /* defined (G_OS_UNIX) && (KDBUS_TRANSPORT) */
+
/* Unfortunately ancillary messages are discarded when reading from a
* socket using the GSocketInputStream abstraction. So we provide a
* very GInputStream-ish API that uses GSocket in this case (very
GDBusWorkerDisconnectedCallback disconnected_callback;
gpointer user_data;
- /* if not NULL, stream is GSocketConnection */
+ /* if GSocket and GKdbus are NULL, stream is GSocketConnection */
GSocket *socket;
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+ GKdbus *kdbus;
+#endif
/* used for reading */
GMutex read_lock;
GUnixFDList *read_fd_list;
GSocketControlMessage **read_ancillary_messages;
gint read_num_ancillary_messages;
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+ GSList *read_kdbus_msg_items;
+#endif
+
/* Whether an async write, flush or close, or none of those, is pending.
* Only the worker thread may change its value, and only with the write_lock.
goto out;
error = NULL;
- if (worker->socket == NULL)
+ bytes_read = 0;
+
+ if (FALSE)
+ {
+ }
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+ else if (G_IS_KDBUS_CONNECTION (worker->stream))
+ {
+ bytes_read = _g_kdbus_read_finish (worker->kdbus,
+ res,
+ &error);
+
+ /* [KDBUS] Get all received items*/
+ worker->read_kdbus_msg_items = _g_kdbus_get_last_msg_items (worker->kdbus);
+
+ /* [KDBUS] Attach fds (if any) to worker->read_fd_list */
+ _g_kdbus_attach_fds_to_msg (worker->kdbus, &worker->read_fd_list);
+
+ /* [KDBUS] For KDBUS transport we read whole message at once*/
+ worker->read_buffer_bytes_wanted = bytes_read;
+ }
+#endif
+ else if (worker->socket == NULL)
bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
res,
&error);
_g_dbus_debug_print_lock ();
g_print ("========================================================================\n"
"GDBus-debug:Transport:\n"
- " ---- READ ERROR on stream of type %s:\n"
+ " ---- READ ERROR:\n"
" ---- %s %d: %s\n",
- g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
g_quark_to_string (error->domain), error->code,
error->message);
_g_dbus_debug_print_unlock ();
goto out;
}
- read_message_print_transport_debug (bytes_read, worker);
+ /* [KDBUS] don't print transport dbus debug for kdbus connection */
+ if (!G_IS_KDBUS_CONNECTION (worker->stream))
+ read_message_print_transport_debug (bytes_read, worker);
worker->read_buffer_cur_size += bytes_read;
if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
/* TODO: use connection->priv->auth to decode the message */
- message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
- worker->read_buffer_cur_size,
- worker->capabilities,
- &error);
- if (message == NULL)
+ if (FALSE)
{
- gchar *s;
- s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
- g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
- "The error is: %s\n"
- "The payload is as follows:\n"
- "%s\n",
- worker->read_buffer_cur_size,
- error->message,
- s);
- g_free (s);
- _g_dbus_worker_emit_disconnected (worker, FALSE, error);
- g_error_free (error);
- goto out;
+ }
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+ else if (G_IS_KDBUS_CONNECTION (worker->stream))
+ {
+ GDBusMessageType message_type;
+ gchar *sender;
+ gchar *destination;
+
+ message = _g_dbus_message_new_from_kdbus_items (worker->read_kdbus_msg_items,
+ &error);
+
+ /* [KDBUS] override informations from the user header with kernel msg header */
+ sender = _g_kdbus_get_last_msg_sender (worker->kdbus);
+ g_dbus_message_set_sender (message, sender);
+
+ message_type = g_dbus_message_get_message_type (message);
+ if (message_type == G_DBUS_MESSAGE_TYPE_SIGNAL)
+ {
+ destination = _g_kdbus_get_last_msg_destination (worker->kdbus);
+ g_dbus_message_set_destination (message, destination);
+ }
+
+ if (message == NULL)
+ {
+ g_warning ("Error decoding D-Bus (kdbus) message\n");
+ g_error_free (error);
+ goto out;
+ }
+ }
+#endif
+ else
+ {
+ message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
+ worker->read_buffer_cur_size,
+ worker->capabilities,
+ &error);
+
+ if (message == NULL)
+ {
+ gchar *s;
+ s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
+ g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
+ "The error is: %s\n"
+ "The payload is as follows:\n"
+ "%s\n",
+ worker->read_buffer_cur_size,
+ error->message,
+ s);
+ g_free (s);
+ _g_dbus_worker_emit_disconnected (worker, FALSE, error);
+ g_error_free (error);
+ goto out;
+ }
}
#ifdef G_OS_UNIX
g_free (s);
if (G_UNLIKELY (_g_dbus_debug_payload ()))
{
- s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
+ if (FALSE)
+ {
+ }
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+ else if (G_IS_KDBUS_CONNECTION (worker->stream))
+ s = _g_kdbus_hexdump_all_items (worker->read_kdbus_msg_items);
+#endif
+ else
+ s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
g_print ("%s\n", s);
g_free (s);
}
}
out:
+
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+ /* [KDBUS] release memory occupied by kdbus message */
+ if (G_IS_KDBUS_CONNECTION (worker->stream))
+ {
+ if (!_g_kdbus_is_closed (worker->kdbus))
+ {
+ _g_kdbus_release_kmsg (worker->kdbus);
+ worker->read_kdbus_msg_items = NULL;
+ }
+ worker->read_buffer = NULL;
+ }
+#endif
+
g_mutex_unlock (&worker->read_lock);
/* gives up the reference acquired when calling g_input_stream_read_async() */
* true, because only failing a read causes us to signal 'closed'.
*/
+ /* [KDBUS]
+ * For KDBUS transport we don't have to alloc buffer (worker->read_buffer)
+ * instead of it we use kdbus memory pool. On connection stage KDBUS client
+ * have to register a memory pool, large enough to carry all backlog of
+ * data enqueued for the connection.
+ */
+
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+ if (G_IS_KDBUS_CONNECTION (worker->stream))
+ {
+ _g_kdbus_read(worker->kdbus,
+ worker->cancellable,
+ (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
+ _g_dbus_worker_ref (worker));
+ return;
+ }
+#endif
+
/* if bytes_wanted is zero, it means start reading a message */
if (worker->read_buffer_bytes_wanted == 0)
{
write_message_continue_writing (MessageToWriteData *data)
{
GOutputStream *ostream;
+
#ifdef G_OS_UNIX
GSimpleAsyncResult *simple;
GUnixFDList *fd_list;
-#endif
-#ifdef G_OS_UNIX
/* Note: we can't access data->simple after calling g_async_result_complete () because the
* callback can free @data and we're not completing in idle. So use a copy of the pointer.
*/
simple = data->simple;
-#endif
- ostream = g_io_stream_get_output_stream (data->worker->stream);
-#ifdef G_OS_UNIX
fd_list = g_dbus_message_get_unix_fd_list (data->message);
-#endif
+
+#ifdef KDBUS_TRANSPORT
+ if (G_IS_KDBUS_CONNECTION (data->worker->stream))
+ {
+ GError *error;
+ error = NULL;
+ data->total_written = _g_kdbus_send (data->worker,
+ data->worker->kdbus,
+ data->message,
+ fd_list,
+ data->worker->cancellable,
+ &error);
+
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+ goto out;
+ }
+#endif /* KDBUS_TRANSPORT */
+
+#endif /* G_OS_UNIX */
+
+ ostream = g_io_stream_get_output_stream (data->worker->stream);
g_assert (!g_output_stream_has_pending (ostream));
g_assert_cmpint (data->total_written, <, data->blob_size);
static void
start_flush (FlushAsyncData *data)
{
- g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
- G_PRIORITY_DEFAULT,
- data->worker->cancellable,
- ostream_flush_cb,
- data);
+ /*[KDBUS]: TODO: to investigate */
+ if (G_IS_KDBUS_CONNECTION (data->worker->stream))
+ {
+ g_assert (data->flushers != NULL);
+ flush_data_list_complete (data->flushers, NULL);
+ g_list_free (data->flushers);
+
+ g_mutex_lock (&data->worker->write_lock);
+ data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
+ g_assert (data->worker->output_pending == PENDING_FLUSH);
+ data->worker->output_pending = PENDING_NONE;
+ g_mutex_unlock (&data->worker->write_lock);
+
+ /* OK, cool, finally kick off the next write */
+ continue_writing (data->worker);
+
+ _g_dbus_worker_unref (data->worker);
+ g_free (data);
+ }
+ else
+ {
+ g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
+ G_PRIORITY_DEFAULT,
+ data->worker->cancellable,
+ ostream_flush_cb,
+ data);
+ }
}
/* called in private thread shared by all GDBusConnection instances
{
/* filters altered the message -> reencode */
error = NULL;
+
+ /* [KDBUS]
+ * Setting protocol version, before invoking g_dbus_message_to_blob() will
+ * be removed after preparing new function only for kdbus transport purposes
+ * (this function will be able to create blob directly/unconditionally in memfd
+ * object, without making copy):
+ *
+ * [1] https://code.google.com/p/d-bus/source/browse/TODO
+ */
+
+ if (G_IS_KDBUS_CONNECTION (worker->stream))
+ _g_dbus_message_set_protocol_ver (data->message,2);
+ else
+ _g_dbus_message_set_protocol_ver (data->message,1);
+
new_blob = g_dbus_message_to_blob (data->message,
&new_blob_size,
worker->capabilities,
if (G_IS_SOCKET_CONNECTION (worker->stream))
worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+ if (G_IS_KDBUS_CONNECTION (worker->stream))
+ worker->kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream));
+#endif
+
worker->shared_thread_data = _g_dbus_shared_thread_ref ();
/* begin reading */