[Receiving] async functions copied/modified from gsocket
authorMichal Eljasiewicz <m.eljasiewic@samsung.com>
Tue, 1 Oct 2013 09:17:32 +0000 (11:17 +0200)
committerLukasz Skalski <l.skalski@partner.samsung.com>
Fri, 25 Oct 2013 14:39:16 +0000 (16:39 +0200)
gio/gdbusprivate.c
gio/giotypes.h
gio/gkdbus.c
gio/gkdbus.h

index 6525db3..57e609e 100644 (file)
@@ -118,6 +118,29 @@ typedef struct
   gboolean from_mainloop;
 } ReadWithControlData;
 
+typedef struct
+{
+  GKdbus *kdbus;
+  GCancellable *cancellable;
+
+  void *buffer;
+  gsize count;
+
+  GSimpleAsyncResult *simple;
+
+  gboolean from_mainloop;
+} ReadKdbusData;
+
+static void
+read_kdbus_data_free (ReadKdbusData *data)
+{
+  //g_object_unref (data->kdbus); TODO
+  if (data->cancellable != NULL)
+    g_object_unref (data->cancellable);
+  g_object_unref (data->simple);
+  g_free (data);
+}
+
 static void
 read_with_control_data_free (ReadWithControlData *data)
 {
@@ -129,6 +152,38 @@ read_with_control_data_free (ReadWithControlData *data)
 }
 
 static gboolean
+_g_kdbus_read_ready (GKdbus      *kdbus,
+                     GIOCondition  condition,
+                     gpointer      user_data)
+{
+  ReadKdbusData *data = user_data;
+  GError *error;
+  gssize result;
+  
+  error = NULL;
+  
+  result = g_kdbus_receive (data->kdbus,
+                            data->buffer,
+                            &error);
+  if (result >= 0)
+    {
+      g_simple_async_result_set_op_res_gssize (data->simple, result);
+    }
+  else
+    {
+      g_assert (error != NULL);
+      g_simple_async_result_take_error (data->simple, error);
+    }
+
+  if (data->from_mainloop)
+    g_simple_async_result_complete (data->simple);
+  else
+    g_simple_async_result_complete_in_idle (data->simple);
+
+  return FALSE;
+}
+
+static gboolean
 _g_socket_read_with_control_messages_ready (GSocket      *socket,
                                             GIOCondition  condition,
                                             gpointer      user_data)
@@ -169,6 +224,49 @@ _g_socket_read_with_control_messages_ready (GSocket      *socket,
 }
 
 static void
+_g_kdbus_read (GKdbus                  *kdbus,
+               void                    *buffer,
+               gsize                    count,
+               GCancellable            *cancellable,
+               GAsyncReadyCallback      callback,
+               gpointer                 user_data)
+{
+  ReadKdbusData *data;
+
+  data = g_new0 (ReadKdbusData, 1);
+  data->kdbus = kdbus; /*g_object_ref (socket);*/
+  data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
+  data->buffer = buffer;
+  data->count = count;
+
+  data->simple = g_simple_async_result_new (G_OBJECT (kdbus),
+                                            callback,
+                                            user_data,
+                                            _g_kdbus_read);
+  g_simple_async_result_set_check_cancellable (data->simple, cancellable);
+
+  if (!g_kdbus_condition_check (kdbus, G_IO_IN))
+    {
+      GSource *source;
+      data->from_mainloop = TRUE;
+      source = g_kdbus_create_source (data->kdbus,
+                                       G_IO_IN | G_IO_HUP | G_IO_ERR,
+                                       cancellable);
+      g_source_set_callback (source,
+                             (GSourceFunc) _g_kdbus_read_ready,
+                             data,
+                             (GDestroyNotify) read_kdbus_data_free);
+      g_source_attach (source, g_main_context_get_thread_default ());
+      g_source_unref (source);
+    }
+  else
+    {
+      _g_kdbus_read_ready (data->kdbus, G_IO_IN, data);
+      read_kdbus_data_free (data);
+    }
+}
+
+static void
 _g_socket_read_with_control_messages (GSocket                 *socket,
                                       void                    *buffer,
                                       gsize                    count,
@@ -217,6 +315,22 @@ _g_socket_read_with_control_messages (GSocket                 *socket,
 }
 
 static gssize
+_g_kdbus_read_finish (GKdbus       *kdbus,
+                                             GAsyncResult  *result,
+                                             GError       **error)
+{
+  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+
+  g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
+  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read);
+
+  if (g_simple_async_result_propagate_error (simple, error))
+      return -1;
+  else
+    return g_simple_async_result_get_op_res_gssize (simple);
+}
+
+static gssize
 _g_socket_read_with_control_messages_finish (GSocket       *socket,
                                              GAsyncResult  *result,
                                              GError       **error)
@@ -586,7 +700,12 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
     goto out;
 
   error = NULL;
-  if (worker->socket == NULL)
+  if (G_IS_KDBUS_CONNECTION (worker->stream))
+    {
+      bytes_read = _g_kdbus_read_finish (worker->kdbus,
+                                         res,
+                                         &error);
+  } else if (worker->socket == NULL)
     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
                                              res,
                                              &error);
@@ -848,7 +967,21 @@ _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
     }
 
-  if (worker->socket == NULL)
+  if (G_IS_KDBUS_CONNECTION (worker->stream))
+    {
+      //GError *error;
+      //error = NULL;
+
+      
+      _g_kdbus_read(worker->kdbus, 
+                    worker->read_buffer,
+                    worker->read_buffer_bytes_wanted,
+                                            worker->cancellable,
+                                            (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
+                                            _g_dbus_worker_ref (worker));
+      
+
+  } else if (worker->socket == NULL)
     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
                                worker->read_buffer + worker->read_buffer_cur_size,
                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
@@ -880,7 +1013,7 @@ _g_dbus_worker_do_initial_read (gpointer data)
   //g_mutex_lock (&worker->read_lock);
   //_g_dbus_worker_do_read_unlocked (worker);
   //g_mutex_unlock (&worker->read_lock);
-  //return FALSE;
+  return FALSE;
 }
 
 /* ---------------------------------------------------------------------------------------------------- */
index 1eb1ef7..3687de9 100644 (file)
@@ -369,6 +369,22 @@ typedef gboolean (*GSocketSourceFunc) (GSocket *socket,
                                       gpointer user_data);
 
 /**
+ * GKdbusSourceFunc:
+ * @socket: the #GKdbus
+ * @condition: the current condition at the source fired.
+ * @user_data: data passed in by the user.
+ *
+ * This is the function type of the callback used for the #GSource
+ * returned by g_kdbus_create_source().
+ *
+ * Returns: it should return %FALSE if the source should be removed.
+ *
+ */
+typedef gboolean (*GKdbusSourceFunc) (GKdbus *kdbus,
+                                      GIOCondition condition,
+                                      gpointer user_data);
+
+/**
  * GInputVector:
  * @buffer: Pointer to a buffer where data will be written.
  * @size: the available size in @buffer.
index 4475812..3059274 100644 (file)
@@ -52,6 +52,7 @@
 #include "kdbus.h"
 #include "gdbusmessage.h"
 #include "gdbusconnection.h"
+//#include "gclosure.h"
 
 #define KDBUS_PART_FOREACH(part, head, first)                          \
        for (part = (head)->first;                                      \
@@ -96,6 +97,8 @@ struct _GKdbusPrivate
   gchar          *buffer_ptr;
   gint            peer_id;
   gchar          *sender;
+  guint           timeout;
+  guint           timed_out : 1;
 };
 
 // TODO:
@@ -298,6 +301,29 @@ gboolean g_kdbus_register(GKdbus           *kdbus)
        return TRUE;
 }
 
+GIOCondition
+g_kdbus_condition_check(GKdbus *kdbus,
+                                         GIOCondition  condition)
+{
+  GPollFD poll_fd;
+  gint result;  
+  g_return_val_if_fail (G_IS_KDBUS (kdbus), 0);
+
+  //if (!check_socket (socket, NULL)) TODO !check for valid kdbus!
+  //  return 0;  
+  poll_fd.fd = kdbus->priv->fd;
+  poll_fd.events = condition;
+  poll_fd.revents = 0;
+
+  do
+    result = g_poll (&poll_fd, 1, 0);
+  while (result == -1 && errno == EINTR);
+
+  return poll_fd.revents;
+  
+}
+
+
 /*
  * g_kdbus_decode_msg:
  * @kdbus_msg: kdbus message received into buffer
@@ -368,6 +394,191 @@ g_kdbus_decode_msg(GKdbus           *kdbus,
   return ret_size;
 }
 
+typedef struct {
+  GSource       source;
+  GPollFD       pollfd;
+  GKdbus       *kdbus;
+  GIOCondition  condition;
+  GCancellable *cancellable;
+  GPollFD       cancel_pollfd;
+  gint64        timeout_time;
+} GKdbusSource;
+
+static gboolean
+kdbus_source_prepare (GSource *source,
+                      gint    *timeout)
+{
+  GKdbusSource *kdbus_source = (GKdbusSource *)source;
+
+  if (g_cancellable_is_cancelled (kdbus_source->cancellable))
+    return TRUE;
+
+  if (kdbus_source->timeout_time)
+    {
+      gint64 now;
+
+      now = g_source_get_time (source);
+      /* Round up to ensure that we don't try again too early */
+      *timeout = (kdbus_source->timeout_time - now + 999) / 1000;
+      if (*timeout < 0)
+        {
+          kdbus_source->kdbus->priv->timed_out = TRUE;
+          *timeout = 0;
+          return TRUE;
+        }
+    }
+  else
+    *timeout = -1;
+
+  if ((kdbus_source->condition & kdbus_source->pollfd.revents) != 0)
+    return TRUE;
+
+  return FALSE;
+}
+
+static gboolean
+kdbus_source_check (GSource *source)
+{
+  int timeout;
+
+  return kdbus_source_prepare (source, &timeout);
+}
+
+static gboolean
+kdbus_source_dispatch (GSource     *source,
+                       GSourceFunc  callback,
+                       gpointer     user_data)
+{
+  GKdbusSourceFunc func = (GKdbusSourceFunc)callback;
+  GKdbusSource *kdbus_source = (GKdbusSource *)source;
+  GKdbus *kdbus = kdbus_source->kdbus;
+  gboolean ret;
+
+  if (kdbus_source->kdbus->priv->timed_out)
+    kdbus_source->pollfd.revents |= kdbus_source->condition & (G_IO_IN | G_IO_OUT);
+
+  ret = (*func) (kdbus,
+                kdbus_source->pollfd.revents & kdbus_source->condition,
+                user_data);
+
+  if (kdbus->priv->timeout)
+    kdbus_source->timeout_time = g_get_monotonic_time () +
+                                  kdbus->priv->timeout * 1000000;
+
+  else
+    kdbus_source->timeout_time = 0;
+
+  return ret;
+}
+
+static void
+kdbus_source_finalize (GSource *source)
+{
+  GKdbusSource *kdbus_source = (GKdbusSource *)source;
+  GKdbus *kdbus;
+
+  kdbus = kdbus_source->kdbus;
+
+  g_object_unref (kdbus);
+
+  if (kdbus_source->cancellable)
+    {
+      g_cancellable_release_fd (kdbus_source->cancellable);
+      g_object_unref (kdbus_source->cancellable);
+    }
+}
+
+static gboolean
+kdbus_source_closure_callback (GKdbus      *kdbus,
+                               GIOCondition  condition,
+                               gpointer      data)
+{
+  GClosure *closure = data;
+
+  GValue params[2] = { G_VALUE_INIT, G_VALUE_INIT };
+  GValue result_value = G_VALUE_INIT;
+  gboolean result;
+
+  g_value_init (&result_value, G_TYPE_BOOLEAN);
+
+  g_value_init (&params[0], G_TYPE_KDBUS);
+  g_value_set_object (&params[0], kdbus);
+  g_value_init (&params[1], G_TYPE_IO_CONDITION);
+  g_value_set_flags (&params[1], condition);
+
+  g_closure_invoke (closure, &result_value, 2, params, NULL);
+
+  result = g_value_get_boolean (&result_value);
+  g_value_unset (&result_value);
+  g_value_unset (&params[0]);
+  g_value_unset (&params[1]);
+
+  return result;
+}
+
+static GSourceFuncs kdbus_source_funcs =
+{
+  kdbus_source_prepare,
+  kdbus_source_check,
+  kdbus_source_dispatch,
+  kdbus_source_finalize,
+  (GSourceFunc)kdbus_source_closure_callback,
+  (GSourceDummyMarshal)g_cclosure_marshal_generic,
+};
+
+/*
+ * TODO Windows cases removed when 
+ */
+
+static GSource *
+kdbus_source_new (GKdbus      *kdbus,
+                  GIOCondition  condition,
+                  GCancellable *cancellable)
+{
+  GSource *source;
+  GKdbusSource *kdbus_source;
+
+  condition |= G_IO_HUP | G_IO_ERR;
+
+  source = g_source_new (&kdbus_source_funcs, sizeof (GKdbusSource));
+  g_source_set_name (source, "GKdbus");
+  kdbus_source = (GKdbusSource *)source;
+
+  kdbus_source->kdbus = g_object_ref (kdbus);
+  kdbus_source->condition = condition;
+
+  if (g_cancellable_make_pollfd (cancellable,
+                                 &kdbus_source->cancel_pollfd))
+    {
+      kdbus_source->cancellable = g_object_ref (cancellable);
+      g_source_add_poll (source, &kdbus_source->cancel_pollfd);
+    }
+
+  kdbus_source->pollfd.fd = kdbus->priv->fd;
+  kdbus_source->pollfd.events = condition;
+  kdbus_source->pollfd.revents = 0;
+  g_source_add_poll (source, &kdbus_source->pollfd);
+
+  if (kdbus->priv->timeout)
+    kdbus_source->timeout_time = g_get_monotonic_time () +
+                                  kdbus->priv->timeout * 1000000;
+
+  else
+    kdbus_source->timeout_time = 0;
+
+  return source;
+}
+
+GSource *              
+g_kdbus_create_source (GKdbus                 *kdbus,
+                                                                GIOCondition            condition,
+                                                                GCancellable           *cancellable)
+{
+  g_return_val_if_fail (G_IS_KDBUS (kdbus) && (cancellable == NULL || G_IS_CANCELLABLE (cancellable)), NULL);
+
+  return kdbus_source_new (kdbus, condition, cancellable);
+}
+
 /*
  * g_kdbus_receive:
  * @kdbus: a #GKdbus
@@ -376,10 +587,10 @@ g_kdbus_decode_msg(GKdbus           *kdbus,
  */
 gssize
 g_kdbus_receive (GKdbus       *kdbus,
-                 void         *data,
+                 char         *data,
                             GError       **error)
 {
-  int ret_size;
+  int ret_size = 0;
   guint64 __attribute__ ((__aligned__(8))) offset;
   struct kdbus_msg *msg;
 
@@ -389,6 +600,7 @@ g_kdbus_receive (GKdbus       *kdbus,
   {
          if(errno == EINTR)
                  goto again;
+    g_print ("g_kdbus_receive: ioctl MSG_RECV failed! \n");
          return -1;
   }
 
@@ -402,6 +614,7 @@ g_kdbus_receive (GKdbus       *kdbus,
        {
                if(errno == EINTR)
                        goto again2;
+    g_print ("g_kdbus_receive: ioctl MSG_RELEASE failed! \n");
                return -1;
        }
   
@@ -447,6 +660,7 @@ g_kdbus_send_reply(GDBusWorker     *worker,
 /**
  * g_kdbus_send_message:
  * @kdbus: a #GKdbus
+ * TODO handle errors
  */
 gssize
 g_kdbus_send_message (GDBusWorker     *worker,
index c42e53e..e21e91d 100644 (file)
@@ -89,7 +89,7 @@ GLIB_AVAILABLE_IN_ALL
 gboolean                           g_kdbus_is_closed                     (GKdbus           *kdbus);
 GLIB_AVAILABLE_IN_ALL
 gssize                  g_kdbus_receive                 (GKdbus       *kdbus,
-                                                         void         *data,
+                                                         char         *data,
                                                                     GError       **error);
 GLIB_AVAILABLE_IN_ALL
 gssize                  g_kdbus_send_message            (GDBusWorker     *worker,
@@ -102,6 +102,13 @@ GLIB_AVAILABLE_IN_ALL
 gboolean                g_kdbus_register                (GKdbus           *kdbus);
 GLIB_AVAILABLE_IN_ALL
 gchar*                  g_kdbus_get_sender                (GKdbus           *kdbus);
+GLIB_AVAILABLE_IN_ALL
+GSource *              g_kdbus_create_source           (GKdbus                 *kdbus,
+                                                                                                  GIOCondition             condition,
+                                                                                                  GCancellable            *cancellable);
+GLIB_AVAILABLE_IN_ALL
+GIOCondition           g_kdbus_condition_check         (GKdbus                 *kdbus,
+                                                                                                 GIOCondition             condition);
 
 G_END_DECLS