hook gvariant vectors up to kdbus
[platform/upstream/glib.git] / gio / gdbusprivate.c
index 9b09294..27d63bc 100644 (file)
@@ -13,9 +13,7 @@
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General
- * Public License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
  *
  * Author: David Zeuthen <davidz@redhat.com>
  */
@@ -24,9 +22,6 @@
 
 #include <stdlib.h>
 #include <string.h>
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
 
 #include "giotypes.h"
 #include "gsocket.h"
 #include "ginputstream.h"
 #include "gmemoryinputstream.h"
 #include "giostream.h"
+#include "glib/gstdio.h"
 #include "gsocketcontrolmessage.h"
 #include "gsocketconnection.h"
 #include "gsocketoutputstream.h"
 
 #ifdef G_OS_UNIX
+#include "gkdbus.h"
+#include "gkdbusconnection.h"
 #include "gunixfdmessage.h"
 #include "gunixconnection.h"
 #include "gunixcredentialsmessage.h"
@@ -94,6 +92,107 @@ _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
 
 /* ---------------------------------------------------------------------------------------------------- */
 
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+typedef struct
+{
+  GKdbus *kdbus;
+  GCancellable *cancellable;
+
+  GSimpleAsyncResult *simple;
+
+  gboolean from_mainloop;
+} ReadKdbusData;
+
+static void
+read_kdbus_data_free (ReadKdbusData  *data)
+{
+  g_object_unref (data->kdbus);
+  if (data->cancellable != NULL)
+    g_object_unref (data->cancellable);
+  g_object_unref (data->simple);
+  g_free (data);
+}
+
+static gboolean
+_g_kdbus_read_ready (GKdbus        *kdbus,
+                     GIOCondition   condition,
+                     gpointer       user_data)
+{
+  ReadKdbusData *data = user_data;
+  GError *error = NULL;
+  gssize result;
+
+  result = _g_kdbus_receive (data->kdbus,
+                             data->cancellable,
+                             &error);
+
+  if (result >= 0)
+    {
+      g_simple_async_result_set_op_res_gssize (data->simple, result);
+    }
+  else
+    {
+      g_assert (error != NULL);
+      g_simple_async_result_take_error (data->simple, error);
+    }
+
+  if (data->from_mainloop)
+    g_simple_async_result_complete (data->simple);
+  else
+    g_simple_async_result_complete_in_idle (data->simple);
+
+  return FALSE;
+}
+
+static void
+_g_kdbus_read (GKdbus               *kdbus,
+               GCancellable         *cancellable,
+               GAsyncReadyCallback   callback,
+               gpointer              user_data)
+{
+  ReadKdbusData *data;
+  GSource *source;
+
+  data = g_new0 (ReadKdbusData, 1);
+  data->kdbus = g_object_ref (kdbus);
+  data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
+
+  data->simple = g_simple_async_result_new (G_OBJECT (kdbus),
+                                            callback,
+                                            user_data,
+                                            _g_kdbus_read);
+  g_simple_async_result_set_check_cancellable (data->simple, cancellable);
+
+  data->from_mainloop = TRUE;
+  source = _g_kdbus_create_source (data->kdbus,
+                                   G_IO_IN,
+                                   cancellable);
+  g_source_set_callback (source,
+                         (GSourceFunc) _g_kdbus_read_ready,
+                         data,
+                         (GDestroyNotify) read_kdbus_data_free);
+  g_source_attach (source, g_main_context_get_thread_default ());
+  g_source_unref (source);
+}
+
+static gssize
+_g_kdbus_read_finish (GKdbus        *kdbus,
+                      GAsyncResult  *result,
+                      GError       **error)
+{
+  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+
+  g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
+  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read);
+
+  if (g_simple_async_result_propagate_error (simple, error))
+    return -1;
+  else
+    return g_simple_async_result_get_op_res_gssize (simple);
+}
+
+#endif /* defined (G_OS_UNIX) && (KDBUS_TRANSPORT) */
+
 /* Unfortunately ancillary messages are discarded when reading from a
  * socket using the GSocketInputStream abstraction. So we provide a
  * very GInputStream-ish API that uses GSocket in this case (very
@@ -363,8 +462,11 @@ struct GDBusWorker
   GDBusWorkerDisconnectedCallback     disconnected_callback;
   gpointer                            user_data;
 
-  /* if not NULL, stream is GSocketConnection */
+  /* if GSocket and GKdbus are NULL, stream is GSocketConnection */
   GSocket *socket;
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  GKdbus  *kdbus;
+#endif
 
   /* used for reading */
   GMutex                              read_lock;
@@ -375,6 +477,10 @@ struct GDBusWorker
   GUnixFDList                        *read_fd_list;
   GSocketControlMessage             **read_ancillary_messages;
   gint                                read_num_ancillary_messages;
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  GSList                             *read_kdbus_msg_items;
+#endif
+
 
   /* Whether an async write, flush or close, or none of those, is pending.
    * Only the worker thread may change its value, and only with the write_lock.
@@ -558,6 +664,7 @@ _g_dbus_worker_unfreeze (GDBusWorker *worker)
                          unfreeze_in_idle_cb,
                          _g_dbus_worker_ref (worker),
                          (GDestroyNotify) _g_dbus_worker_unref);
+  g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb");
   g_source_attach (idle_source, worker->shared_thread_data->context);
   g_source_unref (idle_source);
 }
@@ -583,7 +690,29 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
     goto out;
 
   error = NULL;
-  if (worker->socket == NULL)
+  bytes_read = 0;
+
+  if (FALSE)
+    {
+    }
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  else if (G_IS_KDBUS_CONNECTION (worker->stream))
+    {
+      bytes_read = _g_kdbus_read_finish (worker->kdbus,
+                                         res,
+                                         &error);
+
+      /* [KDBUS]  Get all received items*/
+      worker->read_kdbus_msg_items = _g_kdbus_get_last_msg_items (worker->kdbus);
+
+      /* [KDBUS] Attach fds (if any) to worker->read_fd_list */
+      _g_kdbus_attach_fds_to_msg (worker->kdbus, &worker->read_fd_list);
+
+      /* [KDBUS] For KDBUS transport we read whole message at once*/
+      worker->read_buffer_bytes_wanted = bytes_read;
+    }
+#endif
+  else if (worker->socket == NULL)
     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
                                              res,
                                              &error);
@@ -621,7 +750,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
                     {
                       /* TODO: really want a append_steal() */
                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
-                      close (fds[n]);
+                      (void) g_close (fds[n], NULL);
                     }
                 }
               g_free (fds);
@@ -662,9 +791,8 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
           _g_dbus_debug_print_lock ();
           g_print ("========================================================================\n"
                    "GDBus-debug:Transport:\n"
-                   "  ---- READ ERROR on stream of type %s:\n"
+                   "  ---- READ ERROR:\n"
                    "  ---- %s %d: %s\n",
-                   g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
                    g_quark_to_string (error->domain), error->code,
                    error->message);
           _g_dbus_debug_print_unlock ();
@@ -718,7 +846,9 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
       goto out;
     }
 
-  read_message_print_transport_debug (bytes_read, worker);
+  /* [KDBUS] don't print transport dbus debug for kdbus connection */
+  if (!G_IS_KDBUS_CONNECTION (worker->stream))
+    read_message_print_transport_debug (bytes_read, worker);
 
   worker->read_buffer_cur_size += bytes_read;
   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
@@ -734,7 +864,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
                                                      &error);
           if (message_len == -1)
             {
-              g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
+              g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
               g_error_free (error);
               goto out;
@@ -750,25 +880,61 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
 
           /* TODO: use connection->priv->auth to decode the message */
 
-          message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
-                                                  worker->read_buffer_cur_size,
-                                                  worker->capabilities,
-                                                  &error);
-          if (message == NULL)
+          if (FALSE)
             {
-              gchar *s;
-              s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
-              g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
-                         "The error is: %s\n"
-                         "The payload is as follows:\n"
-                         "%s\n",
-                         worker->read_buffer_cur_size,
-                         error->message,
-                         s);
-              g_free (s);
-              _g_dbus_worker_emit_disconnected (worker, FALSE, error);
-              g_error_free (error);
-              goto out;
+            }
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+          else if (G_IS_KDBUS_CONNECTION (worker->stream))
+            {
+              GDBusMessageType message_type;
+              gchar *sender;
+              gchar *destination;
+
+              message = _g_dbus_message_new_from_kdbus_items (worker->read_kdbus_msg_items,
+                                                              &error);
+
+              /* [KDBUS] override informations from the user header with kernel msg header */
+              sender = _g_kdbus_get_last_msg_sender (worker->kdbus);
+              g_dbus_message_set_sender (message, sender);
+
+              message_type = g_dbus_message_get_message_type (message);
+              if (message_type == G_DBUS_MESSAGE_TYPE_SIGNAL)
+                {
+                  destination = _g_kdbus_get_last_msg_destination (worker->kdbus);
+                  g_dbus_message_set_destination (message, destination);
+                }
+
+              if (message == NULL)
+                {
+                   g_warning ("Error decoding D-Bus (kdbus) message\n");
+                   g_error_free (error);
+                   goto out;
+                }
+            }
+#endif
+          else
+            {
+              message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
+                                                      worker->read_buffer_cur_size,
+                                                      worker->capabilities,
+                                                      &error);
+
+              if (message == NULL)
+                {
+                  gchar *s;
+                  s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
+                  g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
+                             "The error is: %s\n"
+                             "The payload is as follows:\n"
+                             "%s\n",
+                             worker->read_buffer_cur_size,
+                             error->message,
+                             s);
+                  g_free (s);
+                  _g_dbus_worker_emit_disconnected (worker, FALSE, error);
+                  g_error_free (error);
+                  goto out;
+                }
             }
 
 #ifdef G_OS_UNIX
@@ -793,7 +959,15 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
               g_free (s);
               if (G_UNLIKELY (_g_dbus_debug_payload ()))
                 {
-                  s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
+                  if (FALSE)
+                    {
+                    }
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+                  else if (G_IS_KDBUS_CONNECTION (worker->stream))
+                    s = _g_kdbus_hexdump_all_items (worker->read_kdbus_msg_items);
+#endif
+                  else
+                    s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
                   g_print ("%s\n", s);
                   g_free (s);
                 }
@@ -816,6 +990,20 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
     }
 
  out:
+
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  /* [KDBUS] release memory occupied by kdbus message */
+  if (G_IS_KDBUS_CONNECTION (worker->stream))
+    {
+      if (!_g_kdbus_is_closed (worker->kdbus))
+        {
+          _g_kdbus_release_kmsg (worker->kdbus);
+          worker->read_kdbus_msg_items = NULL;
+        }
+      worker->read_buffer = NULL;
+    }
+#endif
+
   g_mutex_unlock (&worker->read_lock);
 
   /* gives up the reference acquired when calling g_input_stream_read_async() */
@@ -830,6 +1018,24 @@ _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
    * true, because only failing a read causes us to signal 'closed'.
    */
 
+  /* [KDBUS]
+   * For KDBUS transport we don't  have to alloc buffer (worker->read_buffer)
+   * instead of it we use kdbus memory pool. On connection stage KDBUS client
+   * have to register a memory pool, large enough to  carry all backlog of
+   * data enqueued for the connection.
+   */
+
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  if (G_IS_KDBUS_CONNECTION (worker->stream))
+    {
+      _g_kdbus_read(worker->kdbus,
+                    worker->cancellable,
+                    (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
+                    _g_dbus_worker_ref (worker));
+      return;
+    }
+#endif
+
   /* if bytes_wanted is zero, it means start reading a message */
   if (worker->read_buffer_bytes_wanted == 0)
     {
@@ -963,6 +1169,7 @@ write_message_async_cb (GObject      *source_object,
  * write-lock is not held on entry
  * output_pending is PENDING_WRITE on entry
  */
+#ifdef G_OS_UNIX
 static gboolean
 on_socket_ready (GSocket      *socket,
                  GIOCondition  condition,
@@ -972,6 +1179,7 @@ on_socket_ready (GSocket      *socket,
   write_message_continue_writing (data);
   return FALSE; /* remove source */
 }
+#endif
 
 /* called in private thread shared by all GDBusConnection instances
  *
@@ -982,20 +1190,39 @@ static void
 write_message_continue_writing (MessageToWriteData *data)
 {
   GOutputStream *ostream;
-  GSimpleAsyncResult *simple;
+
 #ifdef G_OS_UNIX
+  GSimpleAsyncResult *simple;
   GUnixFDList *fd_list;
-#endif
 
   /* Note: we can't access data->simple after calling g_async_result_complete () because the
    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
    */
   simple = data->simple;
 
-  ostream = g_io_stream_get_output_stream (data->worker->stream);
-#ifdef G_OS_UNIX
   fd_list = g_dbus_message_get_unix_fd_list (data->message);
-#endif
+
+#ifdef KDBUS_TRANSPORT
+  if (G_IS_KDBUS_CONNECTION (data->worker->stream))
+    {
+      GError *error;
+      error = NULL;
+      data->total_written = _g_kdbus_send (data->worker,
+                                           data->worker->kdbus,
+                                           data->message,
+                                           fd_list,
+                                           data->worker->cancellable,
+                                           &error);
+
+      g_simple_async_result_complete (simple);
+      g_object_unref (simple);
+      goto out;
+    }
+#endif /* KDBUS_TRANSPORT */
+
+#endif /* G_OS_UNIX */
+
+  ostream = g_io_stream_get_output_stream (data->worker->stream);
 
   g_assert (!g_output_stream_has_pending (ostream));
   g_assert_cmpint (data->total_written, <, data->blob_size);
@@ -1106,7 +1333,9 @@ write_message_continue_writing (MessageToWriteData *data)
                                    write_message_async_cb,
                                    data);
     }
+#ifdef G_OS_UNIX
  out:
+#endif
   ;
 }
 
@@ -1229,11 +1458,33 @@ ostream_flush_cb (GObject      *source_object,
 static void
 start_flush (FlushAsyncData *data)
 {
-  g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
-                               G_PRIORITY_DEFAULT,
-                               data->worker->cancellable,
-                               ostream_flush_cb,
-                               data);
+  /*[KDBUS]: TODO: to investigate */
+  if (G_IS_KDBUS_CONNECTION (data->worker->stream))
+    {
+      g_assert (data->flushers != NULL);
+      flush_data_list_complete (data->flushers, NULL);
+      g_list_free (data->flushers);
+
+      g_mutex_lock (&data->worker->write_lock);
+      data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
+      g_assert (data->worker->output_pending == PENDING_FLUSH);
+      data->worker->output_pending = PENDING_NONE;
+      g_mutex_unlock (&data->worker->write_lock);
+
+      /* OK, cool, finally kick off the next write */
+      continue_writing (data->worker);
+
+      _g_dbus_worker_unref (data->worker);
+      g_free (data);
+    }
+  else
+    {
+      g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
+                                   G_PRIORITY_DEFAULT,
+                                   data->worker->cancellable,
+                                   ostream_flush_cb,
+                                   data);
+    }
 }
 
 /* called in private thread shared by all GDBusConnection instances
@@ -1504,6 +1755,21 @@ continue_writing (GDBusWorker *worker)
         {
           /* filters altered the message -> reencode */
           error = NULL;
+
+         /* [KDBUS]
+          * Setting protocol version, before invoking g_dbus_message_to_blob() will
+          * be removed after preparing new function only for kdbus transport purposes
+          * (this function will be able to create blob directly/unconditionally in memfd
+          * object, without making copy):
+          *
+          * [1] https://code.google.com/p/d-bus/source/browse/TODO
+          */
+
+          if (G_IS_KDBUS_CONNECTION (worker->stream))
+            _g_dbus_message_set_protocol_ver (data->message,2);
+          else
+            _g_dbus_message_set_protocol_ver (data->message,1);
+
           new_blob = g_dbus_message_to_blob (data->message,
                                              &new_blob_size,
                                              worker->capabilities,
@@ -1552,7 +1818,7 @@ continue_writing_in_idle_cb (gpointer user_data)
   return FALSE;
 }
 
-/*
+/**
  * @write_data: (transfer full) (allow-none):
  * @flush_data: (transfer full) (allow-none):
  * @close_data: (transfer full) (allow-none):
@@ -1594,6 +1860,7 @@ schedule_writing_unlocked (GDBusWorker        *worker,
                              continue_writing_in_idle_cb,
                              _g_dbus_worker_ref (worker),
                              (GDestroyNotify) _g_dbus_worker_unref);
+      g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
       g_source_attach (idle_source, worker->shared_thread_data->context);
       g_source_unref (idle_source);
     }
@@ -1670,6 +1937,11 @@ _g_dbus_worker_new (GIOStream                              *stream,
   if (G_IS_SOCKET_CONNECTION (worker->stream))
     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
 
+#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT)
+  if (G_IS_KDBUS_CONNECTION (worker->stream))
+    worker->kdbus = _g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream));
+#endif
+
   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
 
   /* begin reading */
@@ -1679,6 +1951,7 @@ _g_dbus_worker_new (GIOStream                              *stream,
                          _g_dbus_worker_do_initial_read,
                          _g_dbus_worker_ref (worker),
                          (GDestroyNotify) _g_dbus_worker_unref);
+  g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
   g_source_attach (idle_source, worker->shared_thread_data->context);
   g_source_unref (idle_source);
 
@@ -1731,7 +2004,7 @@ _g_dbus_worker_stop (GDBusWorker *worker)
    */
   _g_dbus_worker_close (worker, NULL, NULL);
 
-  /* _g_dbus_worker_close holds a ref until after an idle in the the worker
+  /* _g_dbus_worker_close holds a ref until after an idle in the worker
    * thread has run, so we no longer need to unref in an idle like in
    * commit 322e25b535
    */
@@ -1903,7 +2176,7 @@ _g_dbus_debug_print_unlock (void)
   G_UNLOCK (print_lock);
 }
 
-/*
+/**
  * _g_dbus_initialize:
  *
  * Does various one-time init things such as
@@ -2148,7 +2421,7 @@ write_message_print_transport_debug (gssize bytes_written,
   _g_dbus_debug_print_lock ();
   g_print ("========================================================================\n"
            "GDBus-debug:Transport:\n"
-           "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
+           "  >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
            bytes_written,
            g_dbus_message_get_serial (data->message),
@@ -2199,7 +2472,7 @@ read_message_print_transport_debug (gssize bytes_read,
     _g_dbus_debug_print_lock ();
   g_print ("========================================================================\n"
            "GDBus-debug:Transport:\n"
-           "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
+           "  <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
            bytes_read,
            serial,