From c20c2c0abd3bdb1b30b85a586ee6095ed75a7bc2 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sat, 18 Sep 2010 13:05:25 -0400 Subject: [PATCH] Add pollable input/output streams When interfacing with APIs that expect unix-style async I/O, it is useful to be able to tell in advance whether a read/write is going to block. This adds new interfaces GPollableInputStream and GPollableOutputStream that can be implemented by a GInputStream or GOutputStream to add _is_readable/_is_writable, _create_source, and _read_nonblocking/_write_nonblocking methods. Also, implement for GUnixInput/OutputStream and GSocketInput/OutputStream https://bugzilla.gnome.org/show_bug.cgi?id=634241 --- docs/reference/gio/gio-docs.xml | 2 + docs/reference/gio/gio-sections.txt | 41 ++++ docs/reference/gio/gio.types | 3 + gio/Makefile.am | 4 + gio/gio.h | 2 + gio/gio.symbols | 21 ++ gio/giotypes.h | 18 ++ gio/gpollableinputstream.c | 304 ++++++++++++++++++++++++++++ gio/gpollableinputstream.h | 101 +++++++++ gio/gpollableoutputstream.c | 201 ++++++++++++++++++ gio/gpollableoutputstream.h | 98 +++++++++ gio/gsocketconnection.c | 3 +- gio/gsocketinputstream.c | 59 +++++- gio/gsocketoutputstream.c | 60 +++++- gio/gunixinputstream.c | 51 ++++- gio/gunixoutputstream.c | 50 ++++- gio/tests/.gitignore | 1 + gio/tests/Makefile.am | 4 + gio/tests/pollable.c | 240 ++++++++++++++++++++++ 19 files changed, 1251 insertions(+), 12 deletions(-) create mode 100644 gio/gpollableinputstream.c create mode 100644 gio/gpollableinputstream.h create mode 100644 gio/gpollableoutputstream.c create mode 100644 gio/gpollableoutputstream.h create mode 100644 gio/tests/pollable.c diff --git a/docs/reference/gio/gio-docs.xml b/docs/reference/gio/gio-docs.xml index 85c8fabfc..f628e45b4 100644 --- a/docs/reference/gio/gio-docs.xml +++ b/docs/reference/gio/gio-docs.xml @@ -68,6 +68,8 @@ + + File types and applications diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt index 25532adcc..68d6b28a8 100644 --- a/docs/reference/gio/gio-sections.txt +++ b/docs/reference/gio/gio-sections.txt @@ -2934,3 +2934,44 @@ G_IS_PERIODIC g_periodic_get_type + +
+gpollableinputstream +GPollableInputStream +GPollableInputStream +GPollableInputStreamInterface + +g_pollable_input_stream_can_poll +g_pollable_input_stream_is_readable +g_pollable_input_stream_create_source +g_pollable_input_stream_read_nonblocking + +GPollableSourceFunc +g_pollable_source_new + +G_POLLABLE_INPUT_STREAM +G_POLLABLE_INPUT_STREAM_GET_INTERFACE +G_IS_POLLABLE_INPUT_STREAM +G_TYPE_POLLABLE_INPUT_STREAM + +g_pollable_input_stream_get_type +
+ +
+gpollableoutputstream +GPollableOutputStream +GPollableOutputStream +GPollableOutputStreamInterface + +g_pollable_output_stream_can_poll +g_pollable_output_stream_is_writable +g_pollable_output_stream_create_source +g_pollable_output_stream_write_nonblocking + +G_POLLABLE_OUTPUT_STREAM +G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE +G_IS_POLLABLE_OUTPUT_STREAM +G_TYPE_POLLABLE_OUTPUT_STREAM + +g_pollable_output_stream_get_type +
diff --git a/docs/reference/gio/gio.types b/docs/reference/gio/gio.types index ec01b40c9..b0472199a 100644 --- a/docs/reference/gio/gio.types +++ b/docs/reference/gio/gio.types @@ -76,6 +76,9 @@ g_output_stream_splice_flags_get_type g_password_save_get_type g_periodic_get_type g_permission_get_type +g_pollable_input_stream_get_type +g_pollable_io_stream_get_type +g_pollable_output_stream_get_type g_proxy_address_enumerator_get_type g_proxy_address_get_type g_proxy_get_type diff --git a/gio/Makefile.am b/gio/Makefile.am index 2aedfc102..5da34b2c6 100644 --- a/gio/Makefile.am +++ b/gio/Makefile.am @@ -349,6 +349,8 @@ libgio_2_0_la_SOURCES = \ goutputstream.c \ gperiodic.c \ gpermission.c \ + gpollableinputstream.c \ + gpollableoutputstream.c \ gpollfilemonitor.c \ gpollfilemonitor.h \ gproxyresolver.c \ @@ -505,6 +507,8 @@ gio_headers = \ goutputstream.h \ gperiodic.h \ gpermission.h \ + gpollableinputstream.h \ + gpollableoutputstream.h \ gproxyaddress.h \ gproxy.h \ gproxyaddressenumerator.h \ diff --git a/gio/gio.h b/gio/gio.h index c18384ff0..0babc7c10 100644 --- a/gio/gio.h +++ b/gio/gio.h @@ -95,6 +95,8 @@ #include #include #include +#include +#include #include #include #include diff --git a/gio/gio.symbols b/gio/gio.symbols index b417ebae2..867cf6385 100644 --- a/gio/gio.symbols +++ b/gio/gio.symbols @@ -1973,3 +1973,24 @@ g_periodic_remove g_periodic_unblock #endif #endif + +#if IN_HEADER(__G_POLLABLE_INPUT_STREAM_H__) +#if IN_FILE(__G_POLLABLE_INPUT_STREAM_C__) +g_pollable_input_stream_get_type G_GNUC_CONST +g_pollable_input_stream_can_poll +g_pollable_input_stream_create_source +g_pollable_input_stream_is_readable +g_pollable_input_stream_read_nonblocking +g_pollable_source_new +#endif +#endif + +#if IN_HEADER(__G_POLLABLE_OUTPUT_STREAM_H__) +#if IN_FILE(__G_POLLABLE_OUTPUT_STREAM_C__) +g_pollable_output_stream_get_type G_GNUC_CONST +g_pollable_output_stream_can_poll +g_pollable_output_stream_create_source +g_pollable_output_stream_is_writable +g_pollable_output_stream_write_nonblocking +#endif +#endif diff --git a/gio/giotypes.h b/gio/giotypes.h index 829d28534..808e84d41 100644 --- a/gio/giotypes.h +++ b/gio/giotypes.h @@ -124,6 +124,8 @@ typedef struct _GNetworkAddress GNetworkAddress; typedef struct _GNetworkService GNetworkService; typedef struct _GOutputStream GOutputStream; typedef struct _GIOStream GIOStream; +typedef struct _GPollableInputStream GPollableInputStream; /* Dummy typedef */ +typedef struct _GPollableOutputStream GPollableOutputStream; /* Dummy typedef */ typedef struct _GResolver GResolver; typedef struct _GSeekable GSeekable; typedef struct _GSimpleAsyncResult GSimpleAsyncResult; @@ -391,6 +393,22 @@ typedef struct _GDBusNodeInfo GDBusNodeInfo; typedef gboolean (*GCancellableSourceFunc) (GCancellable *cancellable, gpointer user_data); +/** + * GPollableSourceFunc: + * @pollable_stream: the #GPollableInputStream or #GPollableOutputStream + * @user_data: data passed in by the user. + * + * This is the function type of the callback used for the #GSource + * returned by g_pollable_input_stream_create_source() and + * g_pollable_output_stream_create_source(). + * + * Returns: it should return %FALSE if the source should be removed. + * + * Since: 2.28 + */ +typedef gboolean (*GPollableSourceFunc) (GObject *pollable_stream, + gpointer user_data); + G_END_DECLS #endif /* __GIO_TYPES_H__ */ diff --git a/gio/gpollableinputstream.c b/gio/gpollableinputstream.c new file mode 100644 index 000000000..19946c666 --- /dev/null +++ b/gio/gpollableinputstream.c @@ -0,0 +1,304 @@ +/* GIO - GLib Input, Output and Streaming Library + * + * Copyright (C) 2010 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General + * Public License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include "config.h" + +#include + +#include "gpollableinputstream.h" +#include "gasynchelper.h" +#include "gio-marshal.h" +#include "glibintl.h" + +/** + * SECTION:gpollableinputstream + * @short_description: Interface for pollable input streams + * @include: gio/gio.h + * @see_also: #GInputStream, #GPollableOutputStream, #GFileDescriptorBased + * + * #GPollableInputStream is implemented by #GInputStreams that + * can be polled for readiness to read. This can be used when + * interfacing with a non-gio API that expects + * unix-file-descriptor-style asynchronous I/O rather than gio-style. + * + * Since: 2.28 + */ + +G_DEFINE_INTERFACE (GPollableInputStream, g_pollable_input_stream, G_TYPE_INPUT_STREAM) + +static gboolean g_pollable_input_stream_default_can_poll (GPollableInputStream *stream); +static gssize g_pollable_input_stream_default_read_nonblocking (GPollableInputStream *stream, + void *buffer, + gsize size, + GError **error); + +static void +g_pollable_input_stream_default_init (GPollableInputStreamInterface *iface) +{ + iface->can_poll = g_pollable_input_stream_default_can_poll; + iface->read_nonblocking = g_pollable_input_stream_default_read_nonblocking; +} + +static gboolean +g_pollable_input_stream_default_can_poll (GPollableInputStream *stream) +{ + return TRUE; +} + +/** + * g_pollable_input_stream_can_poll: + * @stream: a #GPollableInputStream. + * + * Checks if @stream is actually pollable. Some classes may implement + * #GPollableInputStream but have only certain instances of that class + * be pollable. If this method returns %FALSE, then the behavior of + * other #GPollableInputStream methods is undefined. + * + * For any given stream, the value returned by this method is constant; + * a stream cannot switch from pollable to non-pollable or vice versa. + * + * Returns: %TRUE if @stream is pollable, %FALSE if not. + * + * Since: 2.28 + */ +gboolean +g_pollable_input_stream_can_poll (GPollableInputStream *stream) +{ + g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), FALSE); + + return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->can_poll (stream); +} + +/** + * g_pollable_input_stream_is_readable: + * @stream: a #GPollableInputStream. + * + * Checks if @stream can be read. + * + * Note that some stream types may not be able to implement this 100% + * reliably, and it is possible that a call to g_input_stream_read() + * after this returns %TRUE would still block. To guarantee + * non-blocking behavior, you should always use + * g_pollable_input_stream_read_nonblocking(), which will return a + * %G_IO_ERROR_WOULD_BLOCK error rather than blocking. + * + * Returns: %TRUE if @stream is readable, %FALSE if not. If an error + * has occurred on @stream, this will result in + * g_pollable_input_stream_is_readable() returning %TRUE, and the + * next attempt to read will return the error. + * + * Since: 2.28 + */ +gboolean +g_pollable_input_stream_is_readable (GPollableInputStream *stream) +{ + g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), FALSE); + + return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->is_readable (stream); +} + +/** + * g_pollable_input_stream_create_source: + * @stream: a #GPollableInputStream. + * @cancellable: a #GCancellable, or %NULL + * + * Creates a #GSource that triggers when @stream can be read, or + * @cancellable is triggered or an error occurs. The callback on the + * source is of the #GPollableSourceFunc type. + * + * As with g_pollable_input_stream_is_readable(), it is possible that + * the stream may not actually be readable even after the source + * triggers, so you should use + * g_pollable_input_stream_read_nonblocking() rather than + * g_input_stream_read() from the callback. + * + * Returns: a new #GSource + * + * Since: 2.28 + */ +GSource * +g_pollable_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable) +{ + g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), NULL); + + return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)-> + create_source (stream, cancellable); +} + +static gssize +g_pollable_input_stream_default_read_nonblocking (GPollableInputStream *stream, + void *buffer, + gsize size, + GError **error) +{ + if (!g_pollable_input_stream_is_readable (stream)) + { + g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, + g_strerror (EAGAIN)); + return -1; + } + + return g_input_stream_read (G_INPUT_STREAM (stream), buffer, size, + NULL, error); +} + +/** + * g_pollable_input_stream_read_nonblocking: + * @stream: a #GPollableInputStream + * @buffer: a buffer to read data into (which should be at least @size + * bytes long). + * @size: the number of bytes you want to read + * @cancellable: a #GCancellable, or %NULL + * @error: #GError for error reporting, or %NULL to ignore. + * + * Attempts to read up to @size bytes from @stream into @buffer, as + * with g_input_stream_read(). If @stream is not currently readable, + * this will immediately return %G_IO_ERROR_WOULD_BLOCK, and you can + * use g_pollable_input_stream_create_source() to create a #GSource + * that will be triggered when @stream is readable. + * + * Note that since this method never blocks, you cannot actually + * use @cancellable to cancel it. However, it will return an error + * if @cancellable has already been cancelled when you call, which + * may happen if you call this method after a source triggers due + * to having been cancelled. + * + * Return value: the number of bytes read, or -1 on error (including + * %G_IO_ERROR_WOULD_BLOCK). + */ +gssize +g_pollable_input_stream_read_nonblocking (GPollableInputStream *stream, + void *buffer, + gsize size, + GCancellable *cancellable, + GError **error) +{ + g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), -1); + + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return -1; + + return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)-> + read_nonblocking (stream, buffer, size, error); +} + +/* GPollableSource */ + +typedef struct { + GSource source; + + GObject *stream; +} GPollableSource; + +static gboolean +pollable_source_prepare (GSource *source, + gint *timeout) +{ + *timeout = -1; + return FALSE; +} + +static gboolean +pollable_source_check (GSource *source) +{ + return FALSE; +} + +static gboolean +pollable_source_dispatch (GSource *source, + GSourceFunc callback, + gpointer user_data) +{ + GPollableSourceFunc func = (GPollableSourceFunc)callback; + GPollableSource *pollable_source = (GPollableSource *)source; + + return (*func) (pollable_source->stream, user_data); +} + +static void +pollable_source_finalize (GSource *source) +{ + GPollableSource *pollable_source = (GPollableSource *)source; + + g_object_unref (pollable_source->stream); +} + +static gboolean +pollable_source_closure_callback (GObject *stream, + gpointer data) +{ + GClosure *closure = data; + + GValue param = { 0, }; + GValue result_value = { 0, }; + gboolean result; + + g_value_init (&result_value, G_TYPE_BOOLEAN); + + g_value_init (¶m, G_TYPE_OBJECT); + g_value_set_object (¶m, stream); + + g_closure_invoke (closure, &result_value, 1, ¶m, NULL); + + result = g_value_get_boolean (&result_value); + g_value_unset (&result_value); + g_value_unset (¶m); + + return result; +} + +static GSourceFuncs pollable_source_funcs = +{ + pollable_source_prepare, + pollable_source_check, + pollable_source_dispatch, + pollable_source_finalize, + (GSourceFunc)pollable_source_closure_callback, + (GSourceDummyMarshal)_gio_marshal_BOOLEAN__VOID, +}; + +/** + * g_pollable_source_new: + * @pollable_stream: the stream associated with the new source + * + * Utility method for #GPollableInputStream and #GPollableOutputStream + * implementations. Creates a new #GSource that expects a callback of + * type #GPollableSourceFunc. The new source does not actually do + * anything on its own; use g_source_add_child_source() to add other + * sources to it to cause it to trigger. + * + * Return value: the new #GSource. + * + * Since: 2.28 + */ +GSource * +g_pollable_source_new (GObject *pollable_stream) +{ + GSource *source; + GPollableSource *pollable_source; + + source = g_source_new (&pollable_source_funcs, sizeof (GPollableSource)); + g_source_set_name (source, "GPollableSource"); + pollable_source = (GPollableSource *)source; + pollable_source->stream = g_object_ref (pollable_stream); + + return source; +} diff --git a/gio/gpollableinputstream.h b/gio/gpollableinputstream.h new file mode 100644 index 000000000..5def93ba8 --- /dev/null +++ b/gio/gpollableinputstream.h @@ -0,0 +1,101 @@ +/* GIO - GLib Input, Output and Streaming Library + * + * Copyright (C) 2010 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General + * Public License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __G_POLLABLE_INPUT_STREAM_H__ +#define __G_POLLABLE_INPUT_STREAM_H__ + +#include + +G_BEGIN_DECLS + +#define G_TYPE_POLLABLE_INPUT_STREAM (g_pollable_input_stream_get_type ()) +#define G_POLLABLE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), G_TYPE_POLLABLE_INPUT_STREAM, GPollableInputStream)) +#define G_IS_POLLABLE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), G_TYPE_POLLABLE_INPUT_STREAM)) +#define G_POLLABLE_INPUT_STREAM_GET_INTERFACE(obj) (G_TYPE_INSTANCE_GET_INTERFACE ((obj), G_TYPE_POLLABLE_INPUT_STREAM, GPollableInputStreamInterface)) + +/** + * GPollableInputStream: + * + * An interface for a #GInputStream that can be polled for readability. + * + * Since: 2.28 + */ +typedef struct _GPollableInputStreamInterface GPollableInputStreamInterface; + +/** + * GPollableInputStreamInterface: + * @g_iface: The parent interface. + * @can_poll: Checks if the #GPollableInputStream instance is actually pollable + * @is_readable: Checks if the stream is readable + * @create_source: Creates a #GSource to poll the stream + * @read_nonblocking: Does a non-blocking read or returns + * %G_IO_ERROR_WOULD_BLOCK + * + * The interface for pollable input streams. + * + * The default implementation of @can_poll always returns %TRUE. + * + * The default implementation of @read_nonblocking calls + * g_pollable_input_stream_is_readable(), and then calls + * g_input_stream_read() if it returns %TRUE. This means you only need + * to override it if it is possible that your @is_readable + * implementation may return %TRUE when the stream is not actually + * readable. + * + * Since: 2.28 + */ +struct _GPollableInputStreamInterface +{ + GTypeInterface g_iface; + + /* Virtual Table */ + gboolean (*can_poll) (GPollableInputStream *stream); + + gboolean (*is_readable) (GPollableInputStream *stream); + GSource * (*create_source) (GPollableInputStream *stream, + GCancellable *cancellable); + gssize (*read_nonblocking) (GPollableInputStream *stream, + void *buffer, + gsize size, + GError **error); +}; + +GType g_pollable_input_stream_get_type (void) G_GNUC_CONST; + +gboolean g_pollable_input_stream_can_poll (GPollableInputStream *stream); + +gboolean g_pollable_input_stream_is_readable (GPollableInputStream *stream); +GSource *g_pollable_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable); + +gssize g_pollable_input_stream_read_nonblocking (GPollableInputStream *stream, + void *buffer, + gsize size, + GCancellable *cancellable, + GError **error); + +/* Helper method for stream implementations */ +GSource *g_pollable_source_new (GObject *stream); + +G_END_DECLS + + +#endif /* __G_POLLABLE_INPUT_STREAM_H__ */ + diff --git a/gio/gpollableoutputstream.c b/gio/gpollableoutputstream.c new file mode 100644 index 000000000..cb9138aad --- /dev/null +++ b/gio/gpollableoutputstream.c @@ -0,0 +1,201 @@ +/* GIO - GLib Input, Output and Streaming Library + * + * Copyright (C) 2010 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General + * Public License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include "config.h" + +#include + +#include "gpollableoutputstream.h" +#include "gasynchelper.h" +#include "gfiledescriptorbased.h" +#include "gio-marshal.h" +#include "glibintl.h" + +/** + * SECTION:gpollableoutputstream + * @short_description: Interface for pollable output streams + * @include: gio/gio.h + * @see_also: #GOutputStream, #GFileDescriptorBased, #GPollableInputStream + * + * #GPollableOutputStream is implemented by #GOutputStreams that + * can be polled for readiness to write. This can be used when + * interfacing with a non-gio API that expects + * unix-file-descriptor-style asynchronous I/O rather than gio-style. + * + * Since: 2.28 + */ + +G_DEFINE_INTERFACE (GPollableOutputStream, g_pollable_output_stream, G_TYPE_OUTPUT_STREAM) + +static gboolean g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream); +static gssize g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream, + const void *buffer, + gsize size, + GError **error); + +static void +g_pollable_output_stream_default_init (GPollableOutputStreamInterface *iface) +{ + iface->can_poll = g_pollable_output_stream_default_can_poll; + iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking; +} + +static gboolean +g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream) +{ + return TRUE; +} + +/** + * g_pollable_output_stream_can_poll: + * @stream: a #GPollableOutputStream. + * + * Checks if @stream is actually pollable. Some classes may implement + * #GPollableOutputStream but have only certain instances of that + * class be pollable. If this method returns %FALSE, then the behavior + * of other #GPollableOutputStream methods is undefined. + * + * For any given stream, the value returned by this method is constant; + * a stream cannot switch from pollable to non-pollable or vice versa. + * + * Returns: %TRUE if @stream is pollable, %FALSE if not. + * + * Since: 2.28 + */ +gboolean +g_pollable_output_stream_can_poll (GPollableOutputStream *stream) +{ + g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), FALSE); + + return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->can_poll (stream); +} + +/** + * g_pollable_output_stream_is_writable: + * @stream: a #GPollableOutputStream. + * + * Checks if @stream can be written. + * + * Note that some stream types may not be able to implement this 100% + * reliably, and it is possible that a call to g_output_stream_write() + * after this returns %TRUE would still block. To guarantee + * non-blocking behavior, you should always use + * g_pollable_output_stream_write_nonblocking(), which will return a + * %G_IO_ERROR_WOULD_BLOCK error rather than blocking. + * + * Returns: %TRUE if @stream is writable, %FALSE if not. If an error + * has occurred on @stream, this will result in + * g_pollable_output_stream_is_writable() returning %TRUE, and the + * next attempt to write will return the error. + * + * Since: 2.28 + */ +gboolean +g_pollable_output_stream_is_writable (GPollableOutputStream *stream) +{ + g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), FALSE); + + return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->is_writable (stream); +} + +/** + * g_pollable_output_stream_create_source: + * @stream: a #GPollableOutputStream. + * @cancellable: a #GCancellable, or %NULL + * + * Creates a #GSource that triggers when @stream can be written, or + * @cancellable is triggered or an error occurs. The callback on the + * source is of the #GPollableSourceFunc type. + * + * As with g_pollable_output_stream_is_writable(), it is possible that + * the stream may not actually be writable even after the source + * triggers, so you should use + * g_pollable_output_stream_write_nonblocking() rather than + * g_output_stream_write() from the callback. + * + * Returns: a new #GSource + * + * Since: 2.28 + */ +GSource * +g_pollable_output_stream_create_source (GPollableOutputStream *stream, + GCancellable *cancellable) +{ + g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), NULL); + + return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)-> + create_source (stream, cancellable); +} + +static gssize +g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream, + const void *buffer, + gsize size, + GError **error) +{ + if (!g_pollable_output_stream_is_writable (stream)) + { + g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, + g_strerror (EAGAIN)); + return -1; + } + + return g_output_stream_write (G_OUTPUT_STREAM (stream), buffer, size, + NULL, error); +} + +/** + * g_pollable_output_stream_write_nonblocking: + * @stream: a #GPollableOutputStream + * @buffer: a buffer to write data from + * @size: the number of bytes you want to write + * @cancellable: a #GCancellable, or %NULL + * @error: #GError for error reporting, or %NULL to ignore. + * + * Attempts to write up to @size bytes from @buffer to @stream, as + * with g_output_stream_write(). If @stream is not currently writable, + * this will immediately return %G_IO_ERROR_WOULD_BLOCK, and you can + * use g_pollable_output_stream_create_source() to create a #GSource + * that will be triggered when @stream is writable. + * + * Note that since this method never blocks, you cannot actually + * use @cancellable to cancel it. However, it will return an error + * if @cancellable has already been cancelled when you call, which + * may happen if you call this method after a source triggers due + * to having been cancelled. + * + * Return value: the number of bytes written, or -1 on error (including + * %G_IO_ERROR_WOULD_BLOCK). + */ +gssize +g_pollable_output_stream_write_nonblocking (GPollableOutputStream *stream, + const void *buffer, + gsize size, + GCancellable *cancellable, + GError **error) +{ + g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), -1); + + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return -1; + + return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)-> + write_nonblocking (stream, buffer, size, error); +} diff --git a/gio/gpollableoutputstream.h b/gio/gpollableoutputstream.h new file mode 100644 index 000000000..abef0ede6 --- /dev/null +++ b/gio/gpollableoutputstream.h @@ -0,0 +1,98 @@ +/* GIO - GLib Input, Output and Streaming Library + * + * Copyright (C) 2010 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General + * Public License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __G_POLLABLE_OUTPUT_STREAM_H__ +#define __G_POLLABLE_OUTPUT_STREAM_H__ + +#include + +G_BEGIN_DECLS + +#define G_TYPE_POLLABLE_OUTPUT_STREAM (g_pollable_output_stream_get_type ()) +#define G_POLLABLE_OUTPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM, GPollableOutputStream)) +#define G_IS_POLLABLE_OUTPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM)) +#define G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE(obj) (G_TYPE_INSTANCE_GET_INTERFACE ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM, GPollableOutputStreamInterface)) + +/** + * GPollableOutputStream: + * + * An interface for a #GOutputStream that can be polled for readability. + * + * Since: 2.28 + */ +typedef struct _GPollableOutputStreamInterface GPollableOutputStreamInterface; + +/** + * GPollableOutputStreamInterface: + * @g_iface: The parent interface. + * @can_poll: Checks if the #GPollableOutputStream instance is actually pollable + * @is_writable: Checks if the stream is writable + * @create_source: Creates a #GSource to poll the stream + * @write_nonblocking: Does a non-blocking write or returns + * %G_IO_ERROR_WOULD_BLOCK + * + * The interface for pollable output streams. + * + * The default implementation of @can_poll always returns %TRUE. + * + * The default implementation of @write_nonblocking calls + * g_pollable_output_stream_is_writable(), and then calls + * g_output_stream_write() if it returns %TRUE. This means you only + * need to override it if it is possible that your @is_writable + * implementation may return %TRUE when the stream is not actually + * writable. + * + * Since: 2.28 + */ +struct _GPollableOutputStreamInterface +{ + GTypeInterface g_iface; + + /* Virtual Table */ + gboolean (*can_poll) (GPollableOutputStream *stream); + + gboolean (*is_writable) (GPollableOutputStream *stream); + GSource * (*create_source) (GPollableOutputStream *stream, + GCancellable *cancellable); + gssize (*write_nonblocking) (GPollableOutputStream *stream, + const void *buffer, + gsize size, + GError **error); +}; + +GType g_pollable_output_stream_get_type (void) G_GNUC_CONST; + +gboolean g_pollable_output_stream_can_poll (GPollableOutputStream *stream); + +gboolean g_pollable_output_stream_is_writable (GPollableOutputStream *stream); +GSource *g_pollable_output_stream_create_source (GPollableOutputStream *stream, + GCancellable *cancellable); + +gssize g_pollable_output_stream_write_nonblocking (GPollableOutputStream *stream, + const void *buffer, + gsize size, + GCancellable *cancellable, + GError **error); + +G_END_DECLS + + +#endif /* __G_POLLABLE_OUTPUT_STREAM_H__ */ + diff --git a/gio/gsocketconnection.c b/gio/gsocketconnection.c index 495f81c36..852805af3 100644 --- a/gio/gsocketconnection.c +++ b/gio/gsocketconnection.c @@ -60,8 +60,7 @@ * Since: 2.22 */ -G_DEFINE_TYPE (GSocketConnection, - g_socket_connection, G_TYPE_IO_STREAM); +G_DEFINE_TYPE (GSocketConnection, g_socket_connection, G_TYPE_IO_STREAM); enum { diff --git a/gio/gsocketinputstream.c b/gio/gsocketinputstream.c index 4a27d9034..fb9da1bf1 100644 --- a/gio/gsocketinputstream.c +++ b/gio/gsocketinputstream.c @@ -27,13 +27,18 @@ #include "gsocketinputstream.h" #include "glibintl.h" -#include -#include -#include +#include "gsimpleasyncresult.h" +#include "gcancellable.h" +#include "gpollableinputstream.h" +#include "gioerror.h" +static void g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface); + #define g_socket_input_stream_get_type _g_socket_input_stream_get_type -G_DEFINE_TYPE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM); +G_DEFINE_TYPE_WITH_CODE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, g_socket_input_stream_pollable_iface_init) + ) enum { @@ -205,6 +210,44 @@ g_socket_input_stream_read_finish (GInputStream *stream, return count; } +static gboolean +g_socket_input_stream_pollable_is_readable (GPollableInputStream *pollable) +{ + GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable); + + return g_socket_condition_check (input_stream->priv->socket, G_IO_IN); +} + +static GSource * +g_socket_input_stream_pollable_create_source (GPollableInputStream *pollable, + GCancellable *cancellable) +{ + GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable); + GSource *socket_source, *pollable_source; + + pollable_source = g_pollable_source_new (G_OBJECT (input_stream)); + socket_source = g_socket_create_source (input_stream->priv->socket, + G_IO_IN, cancellable); + g_source_set_dummy_callback (socket_source); + g_source_add_child_source (pollable_source, socket_source); + g_source_unref (socket_source); + + return pollable_source; +} + +static gssize +g_socket_input_stream_pollable_read_nonblocking (GPollableInputStream *pollable, + void *buffer, + gsize size, + GError **error) +{ + GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable); + + return g_socket_receive_with_blocking (input_stream->priv->socket, + buffer, size, FALSE, + NULL, error); +} + static void g_socket_input_stream_class_init (GSocketInputStreamClass *klass) { @@ -229,6 +272,14 @@ g_socket_input_stream_class_init (GSocketInputStreamClass *klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } +static void +g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface) +{ + iface->is_readable = g_socket_input_stream_pollable_is_readable; + iface->create_source = g_socket_input_stream_pollable_create_source; + iface->read_nonblocking = g_socket_input_stream_pollable_read_nonblocking; +} + static void g_socket_input_stream_init (GSocketInputStream *stream) { diff --git a/gio/gsocketoutputstream.c b/gio/gsocketoutputstream.c index 4febf8803..0ef336016 100644 --- a/gio/gsocketoutputstream.c +++ b/gio/gsocketoutputstream.c @@ -28,14 +28,20 @@ #include "gsocketoutputstream.h" #include "gsocket.h" -#include -#include -#include +#include "gsimpleasyncresult.h" +#include "gcancellable.h" +#include "gpollableinputstream.h" +#include "gpollableoutputstream.h" +#include "gioerror.h" #include "glibintl.h" +static void g_socket_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface); + #define g_socket_output_stream_get_type _g_socket_output_stream_get_type -G_DEFINE_TYPE (GSocketOutputStream, g_socket_output_stream, G_TYPE_OUTPUT_STREAM); +G_DEFINE_TYPE_WITH_CODE (GSocketOutputStream, g_socket_output_stream, G_TYPE_OUTPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, g_socket_output_stream_pollable_iface_init) + ) enum { @@ -207,6 +213,44 @@ g_socket_output_stream_write_finish (GOutputStream *stream, return count; } +static gboolean +g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable) +{ + GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable); + + return g_socket_condition_check (output_stream->priv->socket, G_IO_OUT); +} + +static GSource * +g_socket_output_stream_pollable_create_source (GPollableOutputStream *pollable, + GCancellable *cancellable) +{ + GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable); + GSource *socket_source, *pollable_source; + + pollable_source = g_pollable_source_new (G_OBJECT (output_stream)); + socket_source = g_socket_create_source (output_stream->priv->socket, + G_IO_OUT, cancellable); + g_source_set_dummy_callback (socket_source); + g_source_add_child_source (pollable_source, socket_source); + g_source_unref (socket_source); + + return pollable_source; +} + +static gssize +g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream *pollable, + const void *buffer, + gsize size, + GError **error) +{ + GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable); + + return g_socket_send_with_blocking (output_stream->priv->socket, + buffer, size, FALSE, + NULL, error); +} + static void g_socket_output_stream_class_init (GSocketOutputStreamClass *klass) { @@ -231,6 +275,14 @@ g_socket_output_stream_class_init (GSocketOutputStreamClass *klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } +static void +g_socket_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface) +{ + iface->is_writable = g_socket_output_stream_pollable_is_writable; + iface->create_source = g_socket_output_stream_pollable_create_source; + iface->write_nonblocking = g_socket_output_stream_pollable_write_nonblocking; +} + static void g_socket_output_stream_init (GSocketOutputStream *stream) { diff --git a/gio/gunixinputstream.c b/gio/gunixinputstream.c index e1ee34ac1..43a924729 100644 --- a/gio/gunixinputstream.c +++ b/gio/gunixinputstream.c @@ -60,7 +60,12 @@ enum { PROP_CLOSE_FD }; -G_DEFINE_TYPE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM); +static void g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface); + +G_DEFINE_TYPE_WITH_CODE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, + g_unix_input_stream_pollable_iface_init) + ); struct _GUnixInputStreamPrivate { int fd; @@ -111,6 +116,9 @@ static gboolean g_unix_input_stream_close_finish (GInputStream *stream, GAsyncResult *result, GError **error); +static gboolean g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream); +static GSource *g_unix_input_stream_pollable_create_source (GPollableInputStream *stream, + GCancellable *cancellable); static void g_unix_input_stream_finalize (GObject *object) @@ -174,6 +182,13 @@ g_unix_input_stream_class_init (GUnixInputStreamClass *klass) G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB)); } +static void +g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface) +{ + iface->is_readable = g_unix_input_stream_pollable_is_readable; + iface->create_source = g_unix_input_stream_pollable_create_source; +} + static void g_unix_input_stream_set_property (GObject *object, guint prop_id, @@ -637,3 +652,37 @@ g_unix_input_stream_close_finish (GInputStream *stream, /* Failures handled in generic close_finish code */ return TRUE; } + +static gboolean +g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream) +{ + GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream); + GPollFD poll_fd; + gint result; + + poll_fd.fd = unix_stream->priv->fd; + poll_fd.events = G_IO_IN; + + do + result = g_poll (&poll_fd, 1, 0); + while (result == -1 && errno == EINTR); + + return poll_fd.revents != 0; +} + +static GSource * +g_unix_input_stream_pollable_create_source (GPollableInputStream *stream, + GCancellable *cancellable) +{ + GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream); + GSource *inner_source, *pollable_source; + + pollable_source = g_pollable_source_new (G_OBJECT (stream)); + + inner_source = _g_fd_source_new (unix_stream->priv->fd, G_IO_IN, cancellable); + g_source_set_dummy_callback (inner_source); + g_source_add_child_source (pollable_source, inner_source); + g_source_unref (inner_source); + + return pollable_source; +} diff --git a/gio/gunixoutputstream.c b/gio/gunixoutputstream.c index a0acc318d..81021be8e 100644 --- a/gio/gunixoutputstream.c +++ b/gio/gunixoutputstream.c @@ -60,8 +60,12 @@ enum { PROP_CLOSE_FD }; -G_DEFINE_TYPE (GUnixOutputStream, g_unix_output_stream, G_TYPE_OUTPUT_STREAM); +static void g_unix_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface); +G_DEFINE_TYPE_WITH_CODE (GUnixOutputStream, g_unix_output_stream, G_TYPE_OUTPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, + g_unix_output_stream_pollable_iface_init) + ); struct _GUnixOutputStreamPrivate { int fd; @@ -103,6 +107,9 @@ static gboolean g_unix_output_stream_close_finish (GOutputStream *stream, GAsyncResult *result, GError **error); +static gboolean g_unix_output_stream_pollable_is_writable (GPollableOutputStream *stream); +static GSource *g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream, + GCancellable *cancellable); static void g_unix_output_stream_finalize (GObject *object) @@ -160,6 +167,13 @@ g_unix_output_stream_class_init (GUnixOutputStreamClass *klass) G_PARAM_READABLE | G_PARAM_WRITABLE | G_PARAM_STATIC_NAME | G_PARAM_STATIC_NICK | G_PARAM_STATIC_BLURB)); } +static void +g_unix_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface) +{ + iface->is_writable = g_unix_output_stream_pollable_is_writable; + iface->create_source = g_unix_output_stream_pollable_create_source; +} + static void g_unix_output_stream_set_property (GObject *object, guint prop_id, @@ -593,3 +607,37 @@ g_unix_output_stream_close_finish (GOutputStream *stream, /* Failures handled in generic close_finish code */ return TRUE; } + +static gboolean +g_unix_output_stream_pollable_is_writable (GPollableOutputStream *stream) +{ + GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream); + GPollFD poll_fd; + gint result; + + poll_fd.fd = unix_stream->priv->fd; + poll_fd.events = G_IO_OUT; + + do + result = g_poll (&poll_fd, 1, 0); + while (result == -1 && errno == EINTR); + + return poll_fd.revents != 0; +} + +static GSource * +g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream, + GCancellable *cancellable) +{ + GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream); + GSource *inner_source, *pollable_source; + + pollable_source = g_pollable_source_new (G_OBJECT (stream)); + + inner_source = _g_fd_source_new (unix_stream->priv->fd, G_IO_OUT, cancellable); + g_source_set_dummy_callback (inner_source); + g_source_add_child_source (pollable_source, inner_source); + g_source_unref (inner_source); + + return pollable_source; +} diff --git a/gio/tests/.gitignore b/gio/tests/.gitignore index c987b6471..0663fefcd 100644 --- a/gio/tests/.gitignore +++ b/gio/tests/.gitignore @@ -60,6 +60,7 @@ memory-input-stream memory-output-stream network-address org.gtk.test.enums.xml +pollable proxy readwrite resolver diff --git a/gio/tests/Makefile.am b/gio/tests/Makefile.am index 5e65c08e2..ad2273a42 100644 --- a/gio/tests/Makefile.am +++ b/gio/tests/Makefile.am @@ -43,6 +43,7 @@ TEST_PROGS += \ network-address \ gdbus-message \ socket \ + pollable \ $(NULL) if OS_UNIX @@ -204,6 +205,9 @@ network_address_LDADD = $(progs_ldadd) socket_SOURCE = socket.c socket_LDADD = $(progs_ldadd) +pollable_SOURCE = pollable.c +pollable_LDADD = $(progs_ldadd) + contexts_SOURCES = contexts.c contexts_LDADD = $(progs_ldadd) \ $(top_builddir)/gthread/libgthread-2.0.la diff --git a/gio/tests/pollable.c b/gio/tests/pollable.c new file mode 100644 index 000000000..8669e2b1a --- /dev/null +++ b/gio/tests/pollable.c @@ -0,0 +1,240 @@ +/* GLib testing framework examples and tests + * + * Copyright (C) 2010 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General + * Public License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include + +#ifdef G_OS_UNIX +#include +#include +#endif + +GMainLoop *loop; +GPollableInputStream *in; +GOutputStream *out; + +static gboolean +poll_source_callback (GPollableInputStream *in, + gpointer user_data) +{ + GError *error = NULL; + char buf[2]; + gssize nread; + gboolean *success = user_data; + + nread = g_pollable_input_stream_read_nonblocking (in, buf, 2, NULL, &error); + g_assert_no_error (error); + g_assert_cmpint (nread, ==, 2); + g_assert_cmpstr (buf, ==, "x"); + + *success = TRUE; + return FALSE; +} + +static gboolean +check_source_readability_callback (gpointer user_data) +{ + gboolean expected = GPOINTER_TO_INT (user_data); + gboolean readable; + + readable = g_pollable_input_stream_is_readable (in); + g_assert_cmpint (readable, ==, expected); + return FALSE; +} + +static gboolean +write_callback (gpointer user_data) +{ + char *buf = "x"; + gssize nwrote; + GError *error = NULL; + + nwrote = g_output_stream_write (out, buf, 2, NULL, &error); + g_assert_no_error (error); + g_assert_cmpint (nwrote, ==, 2); + + check_source_readability_callback (GINT_TO_POINTER (TRUE)); + + return FALSE; +} + +static gboolean +check_source_and_quit_callback (gpointer user_data) +{ + check_source_readability_callback (user_data); + g_main_loop_quit (loop); + return FALSE; +} + +static void +test_streams (void) +{ + gboolean readable; + GError *error = NULL; + char buf[1]; + gssize nread; + GSource *poll_source; + gboolean success = FALSE; + + readable = g_pollable_input_stream_is_readable (in); + g_assert (!readable); + + nread = g_pollable_input_stream_read_nonblocking (in, buf, 1, NULL, &error); + g_assert_cmpint (nread, ==, -1); + g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK); + g_clear_error (&error); + + /* Create 4 sources, in decreasing order of priority: + * 1. poll source on @in + * 2. idle source that checks if @in is readable once + * (it won't be) and then removes itself + * 3. idle source that writes a byte to @out, checks that + * @in is now readable, and removes itself + * 4. idle source that checks if @in is readable once + * (it won't be, since the poll source will fire before + * this one does) and then quits the loop. + * + * If the poll source triggers before it should, then it will get a + * %G_IO_ERROR_WOULD_BLOCK, and if check() fails in either + * direction, we will catch it at some point. + */ + + poll_source = g_pollable_input_stream_create_source (in, NULL); + g_source_set_priority (poll_source, 1); + g_source_set_callback (poll_source, (GSourceFunc) poll_source_callback, &success, NULL); + g_source_attach (poll_source, NULL); + g_source_unref (poll_source); + + g_idle_add_full (2, check_source_readability_callback, GINT_TO_POINTER (FALSE), NULL); + g_idle_add_full (3, write_callback, NULL, NULL); + g_idle_add_full (4, check_source_and_quit_callback, GINT_TO_POINTER (FALSE), NULL); + + loop = g_main_loop_new (NULL, FALSE); + g_main_loop_run (loop); + g_main_loop_unref (loop); + + g_assert_cmpint (success, ==, TRUE); +} + +#ifdef G_OS_UNIX +static void +test_pollable_unix (void) +{ + int pipefds[2], status; + + status = pipe (pipefds); + g_assert_cmpint (status, ==, 0); + + in = G_POLLABLE_INPUT_STREAM (g_unix_input_stream_new (pipefds[0], TRUE)); + out = g_unix_output_stream_new (pipefds[1], TRUE); + + test_streams (); + + g_object_unref (in); + g_object_unref (out); +} +#endif + +static void +client_connected (GObject *source, + GAsyncResult *result, + gpointer user_data) +{ + GSocketClient *client = G_SOCKET_CLIENT (source); + GSocketConnection **conn = user_data; + GError *error = NULL; + + *conn = g_socket_client_connect_finish (client, result, &error); + g_assert_no_error (error); +} + +static void +server_connected (GObject *source, + GAsyncResult *result, + gpointer user_data) +{ + GSocketListener *listener = G_SOCKET_LISTENER (source); + GSocketConnection **conn = user_data; + GError *error = NULL; + + *conn = g_socket_listener_accept_finish (listener, result, NULL, &error); + g_assert_no_error (error); +} + +static void +test_pollable_socket (void) +{ + GInetAddress *iaddr; + GSocketAddress *saddr, *effective_address; + GSocketListener *listener; + GSocketClient *client; + GError *error = NULL; + GSocketConnection *client_conn = NULL, *server_conn = NULL; + + iaddr = g_inet_address_new_loopback (G_SOCKET_FAMILY_IPV4); + saddr = g_inet_socket_address_new (iaddr, 0); + g_object_unref (iaddr); + + listener = g_socket_listener_new (); + g_socket_listener_add_address (listener, saddr, + G_SOCKET_TYPE_STREAM, + G_SOCKET_PROTOCOL_TCP, + NULL, + &effective_address, + &error); + g_assert_no_error (error); + g_object_unref (saddr); + + client = g_socket_client_new (); + + g_socket_client_connect_async (client, + G_SOCKET_CONNECTABLE (effective_address), + NULL, client_connected, &client_conn); + g_socket_listener_accept_async (listener, NULL, + server_connected, &server_conn); + + while (!client_conn || !server_conn) + g_main_context_iteration (NULL, TRUE); + + in = G_POLLABLE_INPUT_STREAM (g_io_stream_get_input_stream (G_IO_STREAM (client_conn))); + out = g_io_stream_get_output_stream (G_IO_STREAM (server_conn)); + + test_streams (); + + g_object_unref (client_conn); + g_object_unref (server_conn); + g_object_unref (client); + g_object_unref (listener); +} + +int +main (int argc, + char *argv[]) +{ + g_type_init (); + g_test_init (&argc, &argv, NULL); + +#ifdef G_OS_UNIX + g_test_add_func ("/pollable/unix", test_pollable_unix); +#endif + g_test_add_func ("/pollable/socket", test_pollable_socket); + + return g_test_run(); +} + -- 2.34.1