gboolean from_mainloop;
} ReadWithControlData;
+typedef struct
+{
+ GKdbus *kdbus;
+ GCancellable *cancellable;
+
+ void *buffer;
+ gsize count;
+
+ GSimpleAsyncResult *simple;
+
+ gboolean from_mainloop;
+} ReadKdbusData;
+
+static void
+read_kdbus_data_free (ReadKdbusData *data)
+{
+ //g_object_unref (data->kdbus); TODO
+ if (data->cancellable != NULL)
+ g_object_unref (data->cancellable);
+ g_object_unref (data->simple);
+ g_free (data);
+}
+
static void
read_with_control_data_free (ReadWithControlData *data)
{
}
static gboolean
+_g_kdbus_read_ready (GKdbus *kdbus,
+ GIOCondition condition,
+ gpointer user_data)
+{
+ ReadKdbusData *data = user_data;
+ GError *error;
+ gssize result;
+
+ error = NULL;
+
+ result = g_kdbus_receive (data->kdbus,
+ data->buffer,
+ &error);
+ if (result >= 0)
+ {
+ g_simple_async_result_set_op_res_gssize (data->simple, result);
+ }
+ else
+ {
+ g_assert (error != NULL);
+ g_simple_async_result_take_error (data->simple, error);
+ }
+
+ if (data->from_mainloop)
+ g_simple_async_result_complete (data->simple);
+ else
+ g_simple_async_result_complete_in_idle (data->simple);
+
+ return FALSE;
+}
+
+static gboolean
_g_socket_read_with_control_messages_ready (GSocket *socket,
GIOCondition condition,
gpointer user_data)
}
static void
+_g_kdbus_read (GKdbus *kdbus,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ ReadKdbusData *data;
+
+ data = g_new0 (ReadKdbusData, 1);
+ data->kdbus = kdbus; /*g_object_ref (socket);*/
+ data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL;
+ data->buffer = buffer;
+ data->count = count;
+
+ data->simple = g_simple_async_result_new (G_OBJECT (kdbus),
+ callback,
+ user_data,
+ _g_kdbus_read);
+ g_simple_async_result_set_check_cancellable (data->simple, cancellable);
+
+ if (!g_kdbus_condition_check (kdbus, G_IO_IN))
+ {
+ GSource *source;
+ data->from_mainloop = TRUE;
+ source = g_kdbus_create_source (data->kdbus,
+ G_IO_IN | G_IO_HUP | G_IO_ERR,
+ cancellable);
+ g_source_set_callback (source,
+ (GSourceFunc) _g_kdbus_read_ready,
+ data,
+ (GDestroyNotify) read_kdbus_data_free);
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
+ }
+ else
+ {
+ _g_kdbus_read_ready (data->kdbus, G_IO_IN, data);
+ read_kdbus_data_free (data);
+ }
+}
+
+static void
_g_socket_read_with_control_messages (GSocket *socket,
void *buffer,
gsize count,
}
static gssize
+_g_kdbus_read_finish (GKdbus *kdbus,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+
+ g_return_val_if_fail (G_IS_KDBUS (kdbus), -1);
+ g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read);
+
+ if (g_simple_async_result_propagate_error (simple, error))
+ return -1;
+ else
+ return g_simple_async_result_get_op_res_gssize (simple);
+}
+
+static gssize
_g_socket_read_with_control_messages_finish (GSocket *socket,
GAsyncResult *result,
GError **error)
goto out;
error = NULL;
- if (worker->socket == NULL)
+ if (G_IS_KDBUS_CONNECTION (worker->stream))
+ {
+ bytes_read = _g_kdbus_read_finish (worker->kdbus,
+ res,
+ &error);
+ } else if (worker->socket == NULL)
bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
res,
&error);
worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
}
- if (worker->socket == NULL)
+ if (G_IS_KDBUS_CONNECTION (worker->stream))
+ {
+ //GError *error;
+ //error = NULL;
+
+
+ _g_kdbus_read(worker->kdbus,
+ worker->read_buffer,
+ worker->read_buffer_bytes_wanted,
+ worker->cancellable,
+ (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
+ _g_dbus_worker_ref (worker));
+
+
+ } else if (worker->socket == NULL)
g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
worker->read_buffer + worker->read_buffer_cur_size,
worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
//g_mutex_lock (&worker->read_lock);
//_g_dbus_worker_do_read_unlocked (worker);
//g_mutex_unlock (&worker->read_lock);
- //return FALSE;
+ return FALSE;
}
/* ---------------------------------------------------------------------------------------------------- */
#include "kdbus.h"
#include "gdbusmessage.h"
#include "gdbusconnection.h"
+//#include "gclosure.h"
#define KDBUS_PART_FOREACH(part, head, first) \
for (part = (head)->first; \
gchar *buffer_ptr;
gint peer_id;
gchar *sender;
+ guint timeout;
+ guint timed_out : 1;
};
// TODO:
return TRUE;
}
+GIOCondition
+g_kdbus_condition_check(GKdbus *kdbus,
+ GIOCondition condition)
+{
+ GPollFD poll_fd;
+ gint result;
+ g_return_val_if_fail (G_IS_KDBUS (kdbus), 0);
+
+ //if (!check_socket (socket, NULL)) TODO !check for valid kdbus!
+ // return 0;
+ poll_fd.fd = kdbus->priv->fd;
+ poll_fd.events = condition;
+ poll_fd.revents = 0;
+
+ do
+ result = g_poll (&poll_fd, 1, 0);
+ while (result == -1 && errno == EINTR);
+
+ return poll_fd.revents;
+
+}
+
+
/*
* g_kdbus_decode_msg:
* @kdbus_msg: kdbus message received into buffer
return ret_size;
}
+typedef struct {
+ GSource source;
+ GPollFD pollfd;
+ GKdbus *kdbus;
+ GIOCondition condition;
+ GCancellable *cancellable;
+ GPollFD cancel_pollfd;
+ gint64 timeout_time;
+} GKdbusSource;
+
+static gboolean
+kdbus_source_prepare (GSource *source,
+ gint *timeout)
+{
+ GKdbusSource *kdbus_source = (GKdbusSource *)source;
+
+ if (g_cancellable_is_cancelled (kdbus_source->cancellable))
+ return TRUE;
+
+ if (kdbus_source->timeout_time)
+ {
+ gint64 now;
+
+ now = g_source_get_time (source);
+ /* Round up to ensure that we don't try again too early */
+ *timeout = (kdbus_source->timeout_time - now + 999) / 1000;
+ if (*timeout < 0)
+ {
+ kdbus_source->kdbus->priv->timed_out = TRUE;
+ *timeout = 0;
+ return TRUE;
+ }
+ }
+ else
+ *timeout = -1;
+
+ if ((kdbus_source->condition & kdbus_source->pollfd.revents) != 0)
+ return TRUE;
+
+ return FALSE;
+}
+
+static gboolean
+kdbus_source_check (GSource *source)
+{
+ int timeout;
+
+ return kdbus_source_prepare (source, &timeout);
+}
+
+static gboolean
+kdbus_source_dispatch (GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ GKdbusSourceFunc func = (GKdbusSourceFunc)callback;
+ GKdbusSource *kdbus_source = (GKdbusSource *)source;
+ GKdbus *kdbus = kdbus_source->kdbus;
+ gboolean ret;
+
+ if (kdbus_source->kdbus->priv->timed_out)
+ kdbus_source->pollfd.revents |= kdbus_source->condition & (G_IO_IN | G_IO_OUT);
+
+ ret = (*func) (kdbus,
+ kdbus_source->pollfd.revents & kdbus_source->condition,
+ user_data);
+
+ if (kdbus->priv->timeout)
+ kdbus_source->timeout_time = g_get_monotonic_time () +
+ kdbus->priv->timeout * 1000000;
+
+ else
+ kdbus_source->timeout_time = 0;
+
+ return ret;
+}
+
+static void
+kdbus_source_finalize (GSource *source)
+{
+ GKdbusSource *kdbus_source = (GKdbusSource *)source;
+ GKdbus *kdbus;
+
+ kdbus = kdbus_source->kdbus;
+
+ g_object_unref (kdbus);
+
+ if (kdbus_source->cancellable)
+ {
+ g_cancellable_release_fd (kdbus_source->cancellable);
+ g_object_unref (kdbus_source->cancellable);
+ }
+}
+
+static gboolean
+kdbus_source_closure_callback (GKdbus *kdbus,
+ GIOCondition condition,
+ gpointer data)
+{
+ GClosure *closure = data;
+
+ GValue params[2] = { G_VALUE_INIT, G_VALUE_INIT };
+ GValue result_value = G_VALUE_INIT;
+ gboolean result;
+
+ g_value_init (&result_value, G_TYPE_BOOLEAN);
+
+ g_value_init (¶ms[0], G_TYPE_KDBUS);
+ g_value_set_object (¶ms[0], kdbus);
+ g_value_init (¶ms[1], G_TYPE_IO_CONDITION);
+ g_value_set_flags (¶ms[1], condition);
+
+ g_closure_invoke (closure, &result_value, 2, params, NULL);
+
+ result = g_value_get_boolean (&result_value);
+ g_value_unset (&result_value);
+ g_value_unset (¶ms[0]);
+ g_value_unset (¶ms[1]);
+
+ return result;
+}
+
+static GSourceFuncs kdbus_source_funcs =
+{
+ kdbus_source_prepare,
+ kdbus_source_check,
+ kdbus_source_dispatch,
+ kdbus_source_finalize,
+ (GSourceFunc)kdbus_source_closure_callback,
+ (GSourceDummyMarshal)g_cclosure_marshal_generic,
+};
+
+/*
+ * TODO Windows cases removed when
+ */
+
+static GSource *
+kdbus_source_new (GKdbus *kdbus,
+ GIOCondition condition,
+ GCancellable *cancellable)
+{
+ GSource *source;
+ GKdbusSource *kdbus_source;
+
+ condition |= G_IO_HUP | G_IO_ERR;
+
+ source = g_source_new (&kdbus_source_funcs, sizeof (GKdbusSource));
+ g_source_set_name (source, "GKdbus");
+ kdbus_source = (GKdbusSource *)source;
+
+ kdbus_source->kdbus = g_object_ref (kdbus);
+ kdbus_source->condition = condition;
+
+ if (g_cancellable_make_pollfd (cancellable,
+ &kdbus_source->cancel_pollfd))
+ {
+ kdbus_source->cancellable = g_object_ref (cancellable);
+ g_source_add_poll (source, &kdbus_source->cancel_pollfd);
+ }
+
+ kdbus_source->pollfd.fd = kdbus->priv->fd;
+ kdbus_source->pollfd.events = condition;
+ kdbus_source->pollfd.revents = 0;
+ g_source_add_poll (source, &kdbus_source->pollfd);
+
+ if (kdbus->priv->timeout)
+ kdbus_source->timeout_time = g_get_monotonic_time () +
+ kdbus->priv->timeout * 1000000;
+
+ else
+ kdbus_source->timeout_time = 0;
+
+ return source;
+}
+
+GSource *
+g_kdbus_create_source (GKdbus *kdbus,
+ GIOCondition condition,
+ GCancellable *cancellable)
+{
+ g_return_val_if_fail (G_IS_KDBUS (kdbus) && (cancellable == NULL || G_IS_CANCELLABLE (cancellable)), NULL);
+
+ return kdbus_source_new (kdbus, condition, cancellable);
+}
+
/*
* g_kdbus_receive:
* @kdbus: a #GKdbus
*/
gssize
g_kdbus_receive (GKdbus *kdbus,
- void *data,
+ char *data,
GError **error)
{
- int ret_size;
+ int ret_size = 0;
guint64 __attribute__ ((__aligned__(8))) offset;
struct kdbus_msg *msg;
{
if(errno == EINTR)
goto again;
+ g_print ("g_kdbus_receive: ioctl MSG_RECV failed! \n");
return -1;
}
{
if(errno == EINTR)
goto again2;
+ g_print ("g_kdbus_receive: ioctl MSG_RELEASE failed! \n");
return -1;
}
/**
* g_kdbus_send_message:
* @kdbus: a #GKdbus
+ * TODO handle errors
*/
gssize
g_kdbus_send_message (GDBusWorker *worker,