#include "gsocketinputstream.h"
#include "glibintl.h"
-#include <gio/gsimpleasyncresult.h>
-#include <gio/gcancellable.h>
+#include "gsimpleasyncresult.h"
+#include "gcancellable.h"
+#include "gpollableinputstream.h"
+#include "gioerror.h"
-#include "gioalias.h"
+
+static void g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
#define g_socket_input_stream_get_type _g_socket_input_stream_get_type
-G_DEFINE_TYPE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM);
+G_DEFINE_TYPE_WITH_CODE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, g_socket_input_stream_pollable_iface_init)
+ )
enum
{
/* pending operation metadata */
GSimpleAsyncResult *result;
GCancellable *cancellable;
- gboolean from_mainloop;
gpointer buffer;
gsize count;
};
{
GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (stream);
- return g_socket_receive (input_stream->priv->socket, buffer, count,
- cancellable, error);
+ return g_socket_receive_with_blocking (input_stream->priv->socket,
+ buffer, count, TRUE,
+ cancellable, error);
}
static gboolean
GError *error = NULL;
gssize result;
+ result = g_socket_receive_with_blocking (stream->priv->socket,
+ stream->priv->buffer,
+ stream->priv->count,
+ FALSE,
+ stream->priv->cancellable,
+ &error);
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+ return TRUE;
+
simple = stream->priv->result;
stream->priv->result = NULL;
- result = g_socket_receive (stream->priv->socket,
- stream->priv->buffer,
- stream->priv->count,
- stream->priv->cancellable,
- &error);
if (result >= 0)
g_simple_async_result_set_op_res_gssize (simple, result);
if (error)
- {
- g_simple_async_result_set_from_error (simple, error);
- g_error_free (error);
- }
+ g_simple_async_result_take_error (simple, error);
if (stream->priv->cancellable)
g_object_unref (stream->priv->cancellable);
- if (stream->priv->from_mainloop)
- g_simple_async_result_complete (simple);
- else
- g_simple_async_result_complete_in_idle (simple);
-
+ g_simple_async_result_complete (simple);
g_object_unref (simple);
return FALSE;
gpointer user_data)
{
GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (stream);
+ GSource *source;
g_assert (input_stream->priv->result == NULL);
input_stream->priv->buffer = buffer;
input_stream->priv->count = count;
- if (!g_socket_condition_check (input_stream->priv->socket, G_IO_IN))
- {
- GSource *source;
-
- input_stream->priv->from_mainloop = TRUE;
- source = g_socket_create_source (input_stream->priv->socket,
- G_IO_IN | G_IO_HUP | G_IO_ERR,
- cancellable);
- g_source_set_callback (source,
- (GSourceFunc) g_socket_input_stream_read_ready,
- g_object_ref (input_stream), g_object_unref);
- g_source_attach (source, NULL);
- g_source_unref (source);
- }
- else
- {
- input_stream->priv->from_mainloop = FALSE;
- g_socket_input_stream_read_ready (input_stream->priv->socket, G_IO_IN, input_stream);
- }
+ source = g_socket_create_source (input_stream->priv->socket,
+ G_IO_IN | G_IO_HUP | G_IO_ERR,
+ cancellable);
+ g_source_set_callback (source,
+ (GSourceFunc) g_socket_input_stream_read_ready,
+ g_object_ref (input_stream), g_object_unref);
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
}
static gssize
return count;
}
+static gboolean
+g_socket_input_stream_pollable_is_readable (GPollableInputStream *pollable)
+{
+ GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
+
+ return g_socket_condition_check (input_stream->priv->socket, G_IO_IN);
+}
+
+static GSource *
+g_socket_input_stream_pollable_create_source (GPollableInputStream *pollable,
+ GCancellable *cancellable)
+{
+ GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
+ GSource *socket_source, *pollable_source;
+
+ pollable_source = g_pollable_source_new (G_OBJECT (input_stream));
+ socket_source = g_socket_create_source (input_stream->priv->socket,
+ G_IO_IN, cancellable);
+ g_source_set_dummy_callback (socket_source);
+ g_source_add_child_source (pollable_source, socket_source);
+ g_source_unref (socket_source);
+
+ return pollable_source;
+}
+
+static gssize
+g_socket_input_stream_pollable_read_nonblocking (GPollableInputStream *pollable,
+ void *buffer,
+ gsize size,
+ GError **error)
+{
+ GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
+
+ return g_socket_receive_with_blocking (input_stream->priv->socket,
+ buffer, size, FALSE,
+ NULL, error);
+}
+
static void
g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
{
}
static void
+g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+ iface->is_readable = g_socket_input_stream_pollable_is_readable;
+ iface->create_source = g_socket_input_stream_pollable_create_source;
+ iface->read_nonblocking = g_socket_input_stream_pollable_read_nonblocking;
+}
+
+static void
g_socket_input_stream_init (GSocketInputStream *stream)
{
stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, G_TYPE_SOCKET_INPUT_STREAM, GSocketInputStreamPrivate);
{
return G_SOCKET_INPUT_STREAM (g_object_new (G_TYPE_SOCKET_INPUT_STREAM, "socket", socket, NULL));
}
-
-#define __G_SOCKET_INPUT_STREAM_C__
-#include "gioaliasdef.c"