Fixed problem with msg destination in gkdbus.c. Cleanup code.
[platform/upstream/glib.git] / gio / gdbusprivate.c
index 5b3b77d..38be85d 100644 (file)
@@ -39,6 +39,7 @@
 #include "ginputstream.h"
 #include "gmemoryinputstream.h"
 #include "giostream.h"
+#include "glib/gstdio.h"
 #include "gsocketcontrolmessage.h"
 #include "gsocketconnection.h"
 #include "gsocketoutputstream.h"
@@ -46,6 +47,7 @@
 #ifdef G_OS_UNIX
 #include "gunixfdmessage.h"
 #include "gunixconnection.h"
+#include "gkdbusconnection.h"
 #include "gunixcredentialsmessage.h"
 #endif
 
@@ -116,6 +118,29 @@ typedef struct
   gboolean from_mainloop;
 } ReadWithControlData;
 
+typedef struct
+{
+  GKdbus *kdbus;
+  GCancellable *cancellable;
+
+  void *buffer;
+  gsize count;
+
+  GSimpleAsyncResult *simple;
+
+  gboolean from_mainloop;
+} ReadKdbusData;
+
+static void
+read_kdbus_data_free (ReadKdbusData *data)
+{
+  //g_object_unref (data->kdbus); TODO
+  if (data->cancellable != NULL)
+    g_object_unref (data->cancellable);
+  g_object_unref (data->simple);
+  g_free (data);
+}
+
 static void
 read_with_control_data_free (ReadWithControlData *data)
 {
@@ -127,6 +152,38 @@ read_with_control_data_free (ReadWithControlData *data)
 }
 
 static gboolean
+_g_kdbus_read_ready (GKdbus      *kdbus,
+                     GIOCondition  condition,
+                     gpointer      user_data)
+{
+  ReadKdbusData *data = user_data;
+  GError *error;
+  gssize result;
+  
+  error = NULL;
+  
+  result = g_kdbus_receive (data->kdbus,
+                            data->buffer,
+                            &error);
+  if (result >= 0)
+    {
+      g_simple_async_result_set_op_res_gssize (data->simple, result);
+    }
+  else
+    {
+      g_assert (error != NULL);
+      g_simple_async_result_take_error (data->simple, error);
+    }
+
+  if (data->from_mainloop)
+    g_simple_async_result_complete (data->simple);
+  else
+    g_simple_async_result_complete_in_idle (data->simple);
+
+  return FALSE;
+}
+
+static gboolean
 _g_socket_read_with_control_messages_ready (GSocket      *socket,
                                             GIOCondition  condition,
                                             gpointer      user_data)
@@ -167,6 +224,41 @@ _g_socket_read_with_control_messages_ready (GSocket      *socket,
 }
 
 static void
+_g_kdbus_read (GKdbus                  *kdbus,
+               void                    *buffer,
+               gsize                    count,
+               GCancellable            *cancellable,
+               GAsyncReadyCallback      callback,
+               gpointer                 user_data)
+{
+  ReadKdbusData *data;
+  GSource *source;
+
+  data = g_new0 (ReadKdbusData, 1);
+  data->kdbus = kdbus; /*g_object_ref (socket);*/
+  data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
+  data->buffer = buffer;
+  data->count = count;
+
+  data->simple = g_simple_async_result_new (G_OBJECT (kdbus),
+                                            callback,
+                                            user_data,
+                                            _g_kdbus_read);
+  g_simple_async_result_set_check_cancellable (data->simple, cancellable);
+
+  data->from_mainloop = TRUE;
+  source = g_kdbus_create_source (data->kdbus,
+                                   G_IO_IN | G_IO_HUP | G_IO_ERR,
+                                   cancellable);
+  g_source_set_callback (source,
+                         (GSourceFunc) _g_kdbus_read_ready,
+                         data,
+                         (GDestroyNotify) read_kdbus_data_free);
+  g_source_attach (source, g_main_context_get_thread_default ());
+  g_source_unref (source);
+}
+
+static void
 _g_socket_read_with_control_messages (GSocket                 *socket,
                                       void                    *buffer,
                                       gsize                    count,
@@ -191,6 +283,7 @@ _g_socket_read_with_control_messages (GSocket                 *socket,
                                             callback,
                                             user_data,
                                             _g_socket_read_with_control_messages);
+  g_simple_async_result_set_check_cancellable (data->simple, cancellable);
 
   if (!g_socket_condition_check (socket, G_IO_IN))
     {
@@ -214,6 +307,22 @@ _g_socket_read_with_control_messages (GSocket                 *socket,
 }
 
 static gssize
+_g_kdbus_read_finish (GKdbus       *kdbus,
+                                             GAsyncResult  *result,
+                                             GError       **error)
+{
+  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+
+  g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
+  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read);
+
+  if (g_simple_async_result_propagate_error (simple, error))
+      return -1;
+  else
+    return g_simple_async_result_get_op_res_gssize (simple);
+}
+
+static gssize
 _g_socket_read_with_control_messages_finish (GSocket       *socket,
                                              GAsyncResult  *result,
                                              GError       **error)
@@ -362,8 +471,9 @@ struct GDBusWorker
   GDBusWorkerDisconnectedCallback     disconnected_callback;
   gpointer                            user_data;
 
-  /* if not NULL, stream is GSocketConnection */
+  /* if GSocket and GKdbus are NULL, stream is GSocketConnection */
   GSocket *socket;
+  GKdbus  *kdbus;
 
   /* used for reading */
   GMutex                              read_lock;
@@ -465,13 +575,9 @@ _g_dbus_worker_unref (GDBusWorker *worker)
       if (worker->read_fd_list != NULL)
         g_object_unref (worker->read_fd_list);
 
-      g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
-      g_queue_free (worker->received_messages_while_frozen);
-
+      g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
       g_mutex_clear (&worker->write_lock);
-      g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
-      g_queue_free (worker->write_queue);
-
+      g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
       g_free (worker->read_buffer);
 
       g_free (worker);
@@ -508,7 +614,7 @@ _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
 }
 
 /* can only be called from private thread with read-lock held - takes ownership of @message */
-static void
+void
 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
                                                   GDBusMessage *message)
 {
@@ -586,7 +692,12 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
     goto out;
 
   error = NULL;
-  if (worker->socket == NULL)
+  if (G_IS_KDBUS_CONNECTION (worker->stream))
+    {
+      bytes_read = _g_kdbus_read_finish (worker->kdbus,
+                                         res,
+                                         &error);
+  } else if (worker->socket == NULL)
     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
                                              res,
                                              &error);
@@ -624,7 +735,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
                     {
                       /* TODO: really want a append_steal() */
                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
-                      close (fds[n]);
+                      (void) g_close (fds[n], NULL);
                     }
                 }
               g_free (fds);
@@ -724,6 +835,11 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
   read_message_print_transport_debug (bytes_read, worker);
 
   worker->read_buffer_cur_size += bytes_read;
+
+  /* TODO: [KDBUS] Sprawdzic pole read_buffer_bytes_wanted */
+  if (G_IS_KDBUS_CONNECTION (worker->stream))
+    worker->read_buffer_bytes_wanted = worker->read_buffer_cur_size;
+
   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
     {
       /* OK, got what we asked for! */
@@ -737,7 +853,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
                                                      &error);
           if (message_len == -1)
             {
-              g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message);
+              g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
               g_error_free (error);
               goto out;
@@ -848,7 +964,17 @@ _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
     }
 
-  if (worker->socket == NULL)
+  if (G_IS_KDBUS_CONNECTION (worker->stream))
+    {
+      _g_kdbus_read(worker->kdbus, 
+                    worker->read_buffer,
+                    worker->read_buffer_bytes_wanted,
+                                            worker->cancellable,
+                                            (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
+                                            _g_dbus_worker_ref (worker));
+      
+
+  } else if (worker->socket == NULL)
     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
                                worker->read_buffer + worker->read_buffer_cur_size,
                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
@@ -966,6 +1092,7 @@ write_message_async_cb (GObject      *source_object,
  * write-lock is not held on entry
  * output_pending is PENDING_WRITE on entry
  */
+#ifdef G_OS_UNIX
 static gboolean
 on_socket_ready (GSocket      *socket,
                  GIOCondition  condition,
@@ -975,6 +1102,7 @@ on_socket_ready (GSocket      *socket,
   write_message_continue_writing (data);
   return FALSE; /* remove source */
 }
+#endif
 
 /* called in private thread shared by all GDBusConnection instances
  *
@@ -984,132 +1112,147 @@ on_socket_ready (GSocket      *socket,
 static void
 write_message_continue_writing (MessageToWriteData *data)
 {
-  GOutputStream *ostream;
-  GSimpleAsyncResult *simple;
 #ifdef G_OS_UNIX
-  GUnixFDList *fd_list;
-#endif
-
-  /* Note: we can't access data->simple after calling g_async_result_complete () because the
-   * callback can free @data and we're not completing in idle. So use a copy of the pointer.
-   */
+  GSimpleAsyncResult *simple;
   simple = data->simple;
-
-  ostream = g_io_stream_get_output_stream (data->worker->stream);
-#ifdef G_OS_UNIX
-  fd_list = g_dbus_message_get_unix_fd_list (data->message);
 #endif
 
-  g_assert (!g_output_stream_has_pending (ostream));
-  g_assert_cmpint (data->total_written, <, data->blob_size);
-
-  if (FALSE)
+  if (G_IS_KDBUS_CONNECTION (data->worker->stream))
     {
+      GError *error;
+      error = NULL;
+      data->total_written = g_kdbus_send_message(data->worker, data->worker->kdbus, data->message, data->blob, data->blob_size, &error);
+    
+      write_message_print_transport_debug (data->total_written, data);  
+      g_simple_async_result_complete (simple);
+      g_object_unref (simple);
+      goto out;
     }
-#ifdef G_OS_UNIX
-  else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
+  else
     {
-      GOutputVector vector;
-      GSocketControlMessage *control_message;
-      gssize bytes_written;
-      GError *error;
-
-      vector.buffer = data->blob;
-      vector.size = data->blob_size;
+      GOutputStream *ostream;
+#ifdef G_OS_UNIX
+      GUnixFDList *fd_list;
+#endif
 
-      control_message = NULL;
-      if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
-        {
-          if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
-            {
-              g_simple_async_result_set_error (simple,
-                                               G_IO_ERROR,
-                                               G_IO_ERROR_FAILED,
-                                               "Tried sending a file descriptor but remote peer does not support this capability");
-              g_simple_async_result_complete (simple);
-              g_object_unref (simple);
-              goto out;
-            }
-          control_message = g_unix_fd_message_new_with_fd_list (fd_list);
-        }
+      ostream = g_io_stream_get_output_stream (data->worker->stream);
+#ifdef G_OS_UNIX
+      fd_list = g_dbus_message_get_unix_fd_list (data->message);
+#endif
 
-      error = NULL;
-      bytes_written = g_socket_send_message (data->worker->socket,
-                                             NULL, /* address */
-                                             &vector,
-                                             1,
-                                             control_message != NULL ? &control_message : NULL,
-                                             control_message != NULL ? 1 : 0,
-                                             G_SOCKET_MSG_NONE,
-                                             data->worker->cancellable,
-                                             &error);
-      if (control_message != NULL)
-        g_object_unref (control_message);
+      g_assert (!g_output_stream_has_pending (ostream));
+      g_assert_cmpint (data->total_written, <, data->blob_size);
 
-      if (bytes_written == -1)
+      if (FALSE)
         {
-          /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
-          if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-            {
-              GSource *source;
-              source = g_socket_create_source (data->worker->socket,
-                                               G_IO_OUT | G_IO_HUP | G_IO_ERR,
-                                               data->worker->cancellable);
-              g_source_set_callback (source,
-                                     (GSourceFunc) on_socket_ready,
-                                     data,
-                                     NULL); /* GDestroyNotify */
-              g_source_attach (source, g_main_context_get_thread_default ());
-              g_source_unref (source);
-              g_error_free (error);
-              goto out;
-            }
-          g_simple_async_result_take_error (simple, error);
-          g_simple_async_result_complete (simple);
-          g_object_unref (simple);
-          goto out;
         }
-      g_assert (bytes_written > 0); /* zero is never returned */
-
-      write_message_print_transport_debug (bytes_written, data);
-
-      data->total_written += bytes_written;
-      g_assert (data->total_written <= data->blob_size);
-      if (data->total_written == data->blob_size)
+#ifdef G_OS_UNIX
+      else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
         {
-          g_simple_async_result_complete (simple);
-          g_object_unref (simple);
-          goto out;
-        }
-
-      write_message_continue_writing (data);
-    }
+           GOutputVector vector;
+           GSocketControlMessage *control_message;
+           gssize bytes_written;
+           GError *error;
+
+           vector.buffer = data->blob;
+           vector.size = data->blob_size;
+
+           control_message = NULL;
+           if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
+             {
+               if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
+                 {
+                   g_simple_async_result_set_error (simple,
+                                                    G_IO_ERROR,
+                                                    G_IO_ERROR_FAILED,
+                                                    "Tried sending a file descriptor but remote peer does not support this capability");
+                   g_simple_async_result_complete (simple);
+                   g_object_unref (simple);
+                   goto out;
+                 }
+                control_message = g_unix_fd_message_new_with_fd_list (fd_list);
+              }
+
+            error = NULL;
+            bytes_written = g_socket_send_message (data->worker->socket,
+                                                  NULL, /* address */
+                                                  &vector,
+                                                  1,
+                                                  control_message != NULL ? &control_message : NULL,
+                                                  control_message != NULL ? 1 : 0,
+                                                  G_SOCKET_MSG_NONE,
+                                                  data->worker->cancellable,
+                                                  &error);
+           if (control_message != NULL)
+             g_object_unref (control_message);
+
+           if (bytes_written == -1)
+             {
+               /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
+               if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+                 {
+                   GSource *source;
+                   source = g_socket_create_source (data->worker->socket,
+                                                    G_IO_OUT | G_IO_HUP | G_IO_ERR,
+                                                    data->worker->cancellable);
+                   g_source_set_callback (source,
+                                          (GSourceFunc) on_socket_ready,
+                                          data,
+                                          NULL); /* GDestroyNotify */
+                   g_source_attach (source, g_main_context_get_thread_default ());
+                   g_source_unref (source);
+                   g_error_free (error);
+                   goto out;
+                 }
+               g_simple_async_result_take_error (simple, error);
+               g_simple_async_result_complete (simple);
+               g_object_unref (simple);
+               goto out;
+             }
+           g_assert (bytes_written > 0); /* zero is never returned */
+
+           write_message_print_transport_debug (bytes_written, data);
+
+           data->total_written += bytes_written;
+           g_assert (data->total_written <= data->blob_size);
+           if (data->total_written == data->blob_size)
+             {
+               g_simple_async_result_complete (simple);
+               g_object_unref (simple);
+               goto out;
+             }
+
+           write_message_continue_writing (data);
+         }
 #endif
-  else
-    {
+       else
+         {
 #ifdef G_OS_UNIX
-      if (fd_list != NULL)
-        {
-          g_simple_async_result_set_error (simple,
-                                           G_IO_ERROR,
-                                           G_IO_ERROR_FAILED,
-                                           "Tried sending a file descriptor on unsupported stream of type %s",
-                                           g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
-          g_simple_async_result_complete (simple);
-          g_object_unref (simple);
-          goto out;
-        }
+           if (fd_list != NULL)
+             {
+               g_simple_async_result_set_error (simple,
+                                                G_IO_ERROR,
+                                                G_IO_ERROR_FAILED,
+                                                "Tried sending a file descriptor on unsupported stream of type %s",
+                                                g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
+               g_simple_async_result_complete (simple);
+               g_object_unref (simple);
+               goto out;
+             }
 #endif
 
-      g_output_stream_write_async (ostream,
-                                   (const gchar *) data->blob + data->total_written,
-                                   data->blob_size - data->total_written,
-                                   G_PRIORITY_DEFAULT,
-                                   data->worker->cancellable,
-                                   write_message_async_cb,
-                                   data);
+            g_output_stream_write_async (ostream,
+                                        (const gchar *) data->blob + data->total_written,
+                                        data->blob_size - data->total_written,
+                                        G_PRIORITY_DEFAULT,
+                                        data->worker->cancellable,
+                                        write_message_async_cb,
+                                        data);
+         }
     }
+#ifdef G_OS_UNIX
  out:
+#endif
   ;
 }
 
@@ -1411,9 +1554,7 @@ iostream_close_cb (GObject      *source_object,
   g_clear_error (&error);
 
   /* all messages queued for sending are discarded */
-  g_queue_foreach (send_queue, (GFunc) message_to_write_data_free, NULL);
-  g_queue_free (send_queue);
-
+  g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
   /* all queued flushes fail */
   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
                        _("Operation was cancelled"));
@@ -1675,6 +1816,9 @@ _g_dbus_worker_new (GIOStream                              *stream,
   if (G_IS_SOCKET_CONNECTION (worker->stream))
     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
 
+  if (G_IS_KDBUS_CONNECTION (worker->stream))
+    worker->kdbus = g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream));
+
   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
 
   /* begin reading */
@@ -1736,7 +1880,7 @@ _g_dbus_worker_stop (GDBusWorker *worker)
    */
   _g_dbus_worker_close (worker, NULL, NULL);
 
-  /* _g_dbus_worker_close holds a ref until after an idle in the the worker
+  /* _g_dbus_worker_close holds a ref until after an idle in the worker
    * thread has run, so we no longer need to unref in an idle like in
    * commit 322e25b535
    */
@@ -2053,6 +2197,47 @@ out:
 gchar *
 _g_dbus_get_machine_id (GError **error)
 {
+#ifdef G_OS_WIN32
+  HW_PROFILE_INFOA info;
+  char *src, *dest, *res;
+  int i;
+
+  if (!GetCurrentHwProfileA (&info))
+    {
+      char *message = g_win32_error_message (GetLastError ());
+      g_set_error (error,
+                  G_IO_ERROR,
+                  G_IO_ERROR_FAILED,
+                  _("Unable to get Hardware profile: %s"), message);
+      g_free (message);
+      return NULL;
+    }
+
+  /* Form: {12340001-4980-1920-6788-123456789012} */
+  src = &info.szHwProfileGuid[0];
+
+  res = g_malloc (32+1);
+  dest = res;
+
+  src++; /* Skip { */
+  for (i = 0; i < 8; i++)
+    *dest++ = *src++;
+  src++; /* Skip - */
+  for (i = 0; i < 4; i++)
+    *dest++ = *src++;
+  src++; /* Skip - */
+  for (i = 0; i < 4; i++)
+    *dest++ = *src++;
+  src++; /* Skip - */
+  for (i = 0; i < 4; i++)
+    *dest++ = *src++;
+  src++; /* Skip - */
+  for (i = 0; i < 12; i++)
+    *dest++ = *src++;
+  *dest = 0;
+
+  return res;
+#else
   gchar *ret;
   GError *first_error;
   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
@@ -2078,6 +2263,7 @@ _g_dbus_get_machine_id (GError **error)
       g_strstrip (ret);
     }
   return ret;
+#endif
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
@@ -2112,12 +2298,11 @@ write_message_print_transport_debug (gssize bytes_written,
   g_print ("========================================================================\n"
            "GDBus-debug:Transport:\n"
            "  >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
-           "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
+           "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT "\n",
            bytes_written,
            g_dbus_message_get_serial (data->message),
            data->blob_size,
-           data->total_written,
-           g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
+           data->total_written);
   _g_dbus_debug_print_unlock ();
  out:
   ;
@@ -2163,12 +2348,11 @@ read_message_print_transport_debug (gssize bytes_read,
   g_print ("========================================================================\n"
            "GDBus-debug:Transport:\n"
            "  <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n"
-           "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
+           "       size %d to offset %" G_GSIZE_FORMAT "\n",
            bytes_read,
            serial,
            message_length,
-           worker->read_buffer_cur_size,
-           g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
+           worker->read_buffer_cur_size);
   _g_dbus_debug_print_unlock ();
  out:
   ;