Add pollable input/output streams
authorDan Winship <danw@gnome.org>
Sat, 18 Sep 2010 17:05:25 +0000 (13:05 -0400)
committerDan Winship <danw@gnome.org>
Fri, 26 Nov 2010 20:08:08 +0000 (15:08 -0500)
When interfacing with APIs that expect unix-style async I/O, it is
useful to be able to tell in advance whether a read/write is going to
block. This adds new interfaces GPollableInputStream and
GPollableOutputStream that can be implemented by a GInputStream or
GOutputStream to add _is_readable/_is_writable, _create_source, and
_read_nonblocking/_write_nonblocking methods.

Also, implement for GUnixInput/OutputStream and
GSocketInput/OutputStream

https://bugzilla.gnome.org/show_bug.cgi?id=634241

19 files changed:
docs/reference/gio/gio-docs.xml
docs/reference/gio/gio-sections.txt
docs/reference/gio/gio.types
gio/Makefile.am
gio/gio.h
gio/gio.symbols
gio/giotypes.h
gio/gpollableinputstream.c [new file with mode: 0644]
gio/gpollableinputstream.h [new file with mode: 0644]
gio/gpollableoutputstream.c [new file with mode: 0644]
gio/gpollableoutputstream.h [new file with mode: 0644]
gio/gsocketconnection.c
gio/gsocketinputstream.c
gio/gsocketoutputstream.c
gio/gunixinputstream.c
gio/gunixoutputstream.c
gio/tests/.gitignore
gio/tests/Makefile.am
gio/tests/pollable.c [new file with mode: 0644]

index 85c8fab..f628e45 100644 (file)
@@ -68,6 +68,8 @@
         <xi:include href="xml/gunixoutputstream.xml"/>
         <xi:include href="xml/gconverterinputstream.xml"/>
         <xi:include href="xml/gconverteroutputstream.xml"/>
         <xi:include href="xml/gunixoutputstream.xml"/>
         <xi:include href="xml/gconverterinputstream.xml"/>
         <xi:include href="xml/gconverteroutputstream.xml"/>
+        <xi:include href="xml/gpollableinputstream.xml"/>
+        <xi:include href="xml/gpollableoutputstream.xml"/>
     </chapter>
     <chapter id="types">
         <title>File types and applications</title>
     </chapter>
     <chapter id="types">
         <title>File types and applications</title>
index 25532ad..68d6b28 100644 (file)
@@ -2934,3 +2934,44 @@ G_IS_PERIODIC
 <SUBSECTION Private>
 g_periodic_get_type
 </SECTION>
 <SUBSECTION Private>
 g_periodic_get_type
 </SECTION>
+
+<SECTION>
+<FILE>gpollableinputstream</FILE>
+<TITLE>GPollableInputStream</TITLE>
+GPollableInputStream
+GPollableInputStreamInterface
+<SUBSECTION>
+g_pollable_input_stream_can_poll
+g_pollable_input_stream_is_readable
+g_pollable_input_stream_create_source
+g_pollable_input_stream_read_nonblocking
+<SUBSECTION>
+GPollableSourceFunc
+g_pollable_source_new
+<SUBSECTION Standard>
+G_POLLABLE_INPUT_STREAM
+G_POLLABLE_INPUT_STREAM_GET_INTERFACE
+G_IS_POLLABLE_INPUT_STREAM
+G_TYPE_POLLABLE_INPUT_STREAM
+<SUBSECTION Private>
+g_pollable_input_stream_get_type
+</SECTION>
+
+<SECTION>
+<FILE>gpollableoutputstream</FILE>
+<TITLE>GPollableOutputStream</TITLE>
+GPollableOutputStream
+GPollableOutputStreamInterface
+<SUBSECTION>
+g_pollable_output_stream_can_poll
+g_pollable_output_stream_is_writable
+g_pollable_output_stream_create_source
+g_pollable_output_stream_write_nonblocking
+<SUBSECTION Standard>
+G_POLLABLE_OUTPUT_STREAM
+G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE
+G_IS_POLLABLE_OUTPUT_STREAM
+G_TYPE_POLLABLE_OUTPUT_STREAM
+<SUBSECTION Private>
+g_pollable_output_stream_get_type
+</SECTION>
index ec01b40..b047219 100644 (file)
@@ -76,6 +76,9 @@ g_output_stream_splice_flags_get_type
 g_password_save_get_type
 g_periodic_get_type
 g_permission_get_type
 g_password_save_get_type
 g_periodic_get_type
 g_permission_get_type
+g_pollable_input_stream_get_type
+g_pollable_io_stream_get_type
+g_pollable_output_stream_get_type
 g_proxy_address_enumerator_get_type
 g_proxy_address_get_type
 g_proxy_get_type
 g_proxy_address_enumerator_get_type
 g_proxy_address_get_type
 g_proxy_get_type
index 2aedfc1..5da34b2 100644 (file)
@@ -349,6 +349,8 @@ libgio_2_0_la_SOURCES =             \
        goutputstream.c         \
        gperiodic.c             \
        gpermission.c           \
        goutputstream.c         \
        gperiodic.c             \
        gpermission.c           \
+       gpollableinputstream.c  \
+       gpollableoutputstream.c \
        gpollfilemonitor.c      \
        gpollfilemonitor.h      \
        gproxyresolver.c        \
        gpollfilemonitor.c      \
        gpollfilemonitor.h      \
        gproxyresolver.c        \
@@ -505,6 +507,8 @@ gio_headers =                       \
        goutputstream.h         \
        gperiodic.h             \
        gpermission.h           \
        goutputstream.h         \
        gperiodic.h             \
        gpermission.h           \
+       gpollableinputstream.h  \
+       gpollableoutputstream.h \
        gproxyaddress.h         \
        gproxy.h                \
        gproxyaddressenumerator.h \
        gproxyaddress.h         \
        gproxy.h                \
        gproxyaddressenumerator.h \
index c18384f..0babc7c 100644 (file)
--- a/gio/gio.h
+++ b/gio/gio.h
@@ -95,6 +95,8 @@
 #include <gio/goutputstream.h>
 #include <gio/gperiodic.h>
 #include <gio/gpermission.h>
 #include <gio/goutputstream.h>
 #include <gio/gperiodic.h>
 #include <gio/gpermission.h>
+#include <gio/gpollableinputstream.h>
+#include <gio/gpollableoutputstream.h>
 #include <gio/gproxy.h>
 #include <gio/gproxyaddress.h>
 #include <gio/gproxyaddressenumerator.h>
 #include <gio/gproxy.h>
 #include <gio/gproxyaddress.h>
 #include <gio/gproxyaddressenumerator.h>
index b417eba..867cf63 100644 (file)
@@ -1973,3 +1973,24 @@ g_periodic_remove
 g_periodic_unblock
 #endif
 #endif
 g_periodic_unblock
 #endif
 #endif
+
+#if IN_HEADER(__G_POLLABLE_INPUT_STREAM_H__)
+#if IN_FILE(__G_POLLABLE_INPUT_STREAM_C__)
+g_pollable_input_stream_get_type G_GNUC_CONST
+g_pollable_input_stream_can_poll
+g_pollable_input_stream_create_source
+g_pollable_input_stream_is_readable
+g_pollable_input_stream_read_nonblocking
+g_pollable_source_new
+#endif
+#endif
+
+#if IN_HEADER(__G_POLLABLE_OUTPUT_STREAM_H__)
+#if IN_FILE(__G_POLLABLE_OUTPUT_STREAM_C__)
+g_pollable_output_stream_get_type G_GNUC_CONST
+g_pollable_output_stream_can_poll
+g_pollable_output_stream_create_source
+g_pollable_output_stream_is_writable
+g_pollable_output_stream_write_nonblocking
+#endif
+#endif
index 829d285..808e84d 100644 (file)
@@ -124,6 +124,8 @@ typedef struct _GNetworkAddress               GNetworkAddress;
 typedef struct _GNetworkService               GNetworkService;
 typedef struct _GOutputStream                 GOutputStream;
 typedef struct _GIOStream                     GIOStream;
 typedef struct _GNetworkService               GNetworkService;
 typedef struct _GOutputStream                 GOutputStream;
 typedef struct _GIOStream                     GIOStream;
+typedef struct _GPollableInputStream          GPollableInputStream; /* Dummy typedef */
+typedef struct _GPollableOutputStream         GPollableOutputStream; /* Dummy typedef */
 typedef struct _GResolver                     GResolver;
 typedef struct _GSeekable                     GSeekable;
 typedef struct _GSimpleAsyncResult            GSimpleAsyncResult;
 typedef struct _GResolver                     GResolver;
 typedef struct _GSeekable                     GSeekable;
 typedef struct _GSimpleAsyncResult            GSimpleAsyncResult;
@@ -391,6 +393,22 @@ typedef struct _GDBusNodeInfo                 GDBusNodeInfo;
 typedef gboolean (*GCancellableSourceFunc) (GCancellable *cancellable,
                                            gpointer      user_data);
 
 typedef gboolean (*GCancellableSourceFunc) (GCancellable *cancellable,
                                            gpointer      user_data);
 
+/**
+ * GPollableSourceFunc:
+ * @pollable_stream: the #GPollableInputStream or #GPollableOutputStream
+ * @user_data: data passed in by the user.
+ *
+ * This is the function type of the callback used for the #GSource
+ * returned by g_pollable_input_stream_create_source() and
+ * g_pollable_output_stream_create_source().
+ *
+ * Returns: it should return %FALSE if the source should be removed.
+ *
+ * Since: 2.28
+ */
+typedef gboolean (*GPollableSourceFunc) (GObject  *pollable_stream,
+                                        gpointer  user_data);
+
 G_END_DECLS
 
 #endif /* __GIO_TYPES_H__ */
 G_END_DECLS
 
 #endif /* __GIO_TYPES_H__ */
diff --git a/gio/gpollableinputstream.c b/gio/gpollableinputstream.c
new file mode 100644 (file)
index 0000000..19946c6
--- /dev/null
@@ -0,0 +1,304 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include "config.h"
+
+#include <errno.h>
+
+#include "gpollableinputstream.h"
+#include "gasynchelper.h"
+#include "gio-marshal.h"
+#include "glibintl.h"
+
+/**
+ * SECTION:gpollableinputstream
+ * @short_description: Interface for pollable input streams
+ * @include: gio/gio.h
+ * @see_also: #GInputStream, #GPollableOutputStream, #GFileDescriptorBased
+ *
+ * #GPollableInputStream is implemented by #GInputStream<!-- -->s that
+ * can be polled for readiness to read. This can be used when
+ * interfacing with a non-gio API that expects
+ * unix-file-descriptor-style asynchronous I/O rather than gio-style.
+ *
+ * Since: 2.28
+ */
+
+G_DEFINE_INTERFACE (GPollableInputStream, g_pollable_input_stream, G_TYPE_INPUT_STREAM)
+
+static gboolean g_pollable_input_stream_default_can_poll         (GPollableInputStream *stream);
+static gssize   g_pollable_input_stream_default_read_nonblocking (GPollableInputStream  *stream,
+                                                                 void                  *buffer,
+                                                                 gsize                  size,
+                                                                 GError               **error);
+
+static void
+g_pollable_input_stream_default_init (GPollableInputStreamInterface *iface)
+{
+  iface->can_poll         = g_pollable_input_stream_default_can_poll;
+  iface->read_nonblocking = g_pollable_input_stream_default_read_nonblocking;
+}
+
+static gboolean
+g_pollable_input_stream_default_can_poll (GPollableInputStream *stream)
+{
+  return TRUE;
+}
+
+/**
+ * g_pollable_input_stream_can_poll:
+ * @stream: a #GPollableInputStream.
+ *
+ * Checks if @stream is actually pollable. Some classes may implement
+ * #GPollableInputStream but have only certain instances of that class
+ * be pollable. If this method returns %FALSE, then the behavior of
+ * other #GPollableInputStream methods is undefined.
+ *
+ * For any given stream, the value returned by this method is constant;
+ * a stream cannot switch from pollable to non-pollable or vice versa.
+ *
+ * Returns: %TRUE if @stream is pollable, %FALSE if not.
+ *
+ * Since: 2.28
+ */
+gboolean
+g_pollable_input_stream_can_poll (GPollableInputStream *stream)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), FALSE);
+
+  return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->can_poll (stream);
+}
+
+/**
+ * g_pollable_input_stream_is_readable:
+ * @stream: a #GPollableInputStream.
+ *
+ * Checks if @stream can be read.
+ *
+ * Note that some stream types may not be able to implement this 100%
+ * reliably, and it is possible that a call to g_input_stream_read()
+ * after this returns %TRUE would still block. To guarantee
+ * non-blocking behavior, you should always use
+ * g_pollable_input_stream_read_nonblocking(), which will return a
+ * %G_IO_ERROR_WOULD_BLOCK error rather than blocking.
+ *
+ * Returns: %TRUE if @stream is readable, %FALSE if not. If an error
+ *   has occurred on @stream, this will result in
+ *   g_pollable_input_stream_is_readable() returning %TRUE, and the
+ *   next attempt to read will return the error.
+ *
+ * Since: 2.28
+ */
+gboolean
+g_pollable_input_stream_is_readable (GPollableInputStream *stream)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), FALSE);
+
+  return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->is_readable (stream);
+}
+
+/**
+ * g_pollable_input_stream_create_source:
+ * @stream: a #GPollableInputStream.
+ * @cancellable: a #GCancellable, or %NULL
+ *
+ * Creates a #GSource that triggers when @stream can be read, or
+ * @cancellable is triggered or an error occurs. The callback on the
+ * source is of the #GPollableSourceFunc type.
+ *
+ * As with g_pollable_input_stream_is_readable(), it is possible that
+ * the stream may not actually be readable even after the source
+ * triggers, so you should use
+ * g_pollable_input_stream_read_nonblocking() rather than
+ * g_input_stream_read() from the callback.
+ *
+ * Returns: a new #GSource
+ *
+ * Since: 2.28
+ */
+GSource *
+g_pollable_input_stream_create_source (GPollableInputStream *stream,
+                                      GCancellable         *cancellable)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), NULL);
+
+  return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
+         create_source (stream, cancellable);
+}
+
+static gssize
+g_pollable_input_stream_default_read_nonblocking (GPollableInputStream  *stream,
+                                                 void                  *buffer,
+                                                 gsize                  size,
+                                                 GError               **error)
+{
+  if (!g_pollable_input_stream_is_readable (stream))
+    {
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+                  g_strerror (EAGAIN));
+      return -1;
+    }
+
+  return g_input_stream_read (G_INPUT_STREAM (stream), buffer, size,
+                             NULL, error);
+}
+
+/**
+ * g_pollable_input_stream_read_nonblocking:
+ * @stream: a #GPollableInputStream
+ * @buffer: a buffer to read data into (which should be at least @size
+ *     bytes long).
+ * @size: the number of bytes you want to read
+ * @cancellable: a #GCancellable, or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Attempts to read up to @size bytes from @stream into @buffer, as
+ * with g_input_stream_read(). If @stream is not currently readable,
+ * this will immediately return %G_IO_ERROR_WOULD_BLOCK, and you can
+ * use g_pollable_input_stream_create_source() to create a #GSource
+ * that will be triggered when @stream is readable.
+ *
+ * Note that since this method never blocks, you cannot actually
+ * use @cancellable to cancel it. However, it will return an error
+ * if @cancellable has already been cancelled when you call, which
+ * may happen if you call this method after a source triggers due
+ * to having been cancelled.
+ *
+ * Return value: the number of bytes read, or -1 on error (including
+ *   %G_IO_ERROR_WOULD_BLOCK).
+ */
+gssize
+g_pollable_input_stream_read_nonblocking (GPollableInputStream  *stream,
+                                         void                  *buffer,
+                                         gsize                  size,
+                                         GCancellable          *cancellable,
+                                         GError               **error)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), -1);
+
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return -1;
+
+  return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
+    read_nonblocking (stream, buffer, size, error);
+}
+
+/* GPollableSource */
+
+typedef struct {
+  GSource       source;
+
+  GObject      *stream;
+} GPollableSource;
+
+static gboolean
+pollable_source_prepare (GSource *source,
+                        gint    *timeout)
+{
+  *timeout = -1;
+  return FALSE;
+}
+
+static gboolean
+pollable_source_check (GSource *source)
+{
+  return FALSE;
+}
+
+static gboolean
+pollable_source_dispatch (GSource     *source,
+                         GSourceFunc  callback,
+                         gpointer     user_data)
+{
+  GPollableSourceFunc func = (GPollableSourceFunc)callback;
+  GPollableSource *pollable_source = (GPollableSource *)source;
+
+  return (*func) (pollable_source->stream, user_data);
+}
+
+static void
+pollable_source_finalize (GSource *source)
+{
+  GPollableSource *pollable_source = (GPollableSource *)source;
+
+  g_object_unref (pollable_source->stream);
+}
+
+static gboolean
+pollable_source_closure_callback (GObject  *stream,
+                                 gpointer  data)
+{
+  GClosure *closure = data;
+
+  GValue param = { 0, };
+  GValue result_value = { 0, };
+  gboolean result;
+
+  g_value_init (&result_value, G_TYPE_BOOLEAN);
+
+  g_value_init (&param, G_TYPE_OBJECT);
+  g_value_set_object (&param, stream);
+
+  g_closure_invoke (closure, &result_value, 1, &param, NULL);
+
+  result = g_value_get_boolean (&result_value);
+  g_value_unset (&result_value);
+  g_value_unset (&param);
+
+  return result;
+}
+
+static GSourceFuncs pollable_source_funcs =
+{
+  pollable_source_prepare,
+  pollable_source_check,
+  pollable_source_dispatch,
+  pollable_source_finalize,
+  (GSourceFunc)pollable_source_closure_callback,
+  (GSourceDummyMarshal)_gio_marshal_BOOLEAN__VOID,
+};
+
+/**
+ * g_pollable_source_new:
+ * @pollable_stream: the stream associated with the new source
+ *
+ * Utility method for #GPollableInputStream and #GPollableOutputStream
+ * implementations. Creates a new #GSource that expects a callback of
+ * type #GPollableSourceFunc. The new source does not actually do
+ * anything on its own; use g_source_add_child_source() to add other
+ * sources to it to cause it to trigger.
+ *
+ * Return value: the new #GSource.
+ *
+ * Since: 2.28
+ */
+GSource *
+g_pollable_source_new (GObject *pollable_stream)
+{
+  GSource *source;
+  GPollableSource *pollable_source;
+
+  source = g_source_new (&pollable_source_funcs, sizeof (GPollableSource));
+  g_source_set_name (source, "GPollableSource");
+  pollable_source = (GPollableSource *)source;
+  pollable_source->stream = g_object_ref (pollable_stream);
+
+  return source;
+}
diff --git a/gio/gpollableinputstream.h b/gio/gpollableinputstream.h
new file mode 100644 (file)
index 0000000..5def93b
--- /dev/null
@@ -0,0 +1,101 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __G_POLLABLE_INPUT_STREAM_H__
+#define __G_POLLABLE_INPUT_STREAM_H__
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define G_TYPE_POLLABLE_INPUT_STREAM               (g_pollable_input_stream_get_type ())
+#define G_POLLABLE_INPUT_STREAM(obj)               (G_TYPE_CHECK_INSTANCE_CAST ((obj), G_TYPE_POLLABLE_INPUT_STREAM, GPollableInputStream))
+#define G_IS_POLLABLE_INPUT_STREAM(obj)            (G_TYPE_CHECK_INSTANCE_TYPE ((obj), G_TYPE_POLLABLE_INPUT_STREAM))
+#define G_POLLABLE_INPUT_STREAM_GET_INTERFACE(obj) (G_TYPE_INSTANCE_GET_INTERFACE ((obj), G_TYPE_POLLABLE_INPUT_STREAM, GPollableInputStreamInterface))
+
+/**
+ * GPollableInputStream:
+ *
+ * An interface for a #GInputStream that can be polled for readability.
+ *
+ * Since: 2.28
+ */
+typedef struct _GPollableInputStreamInterface GPollableInputStreamInterface;
+
+/**
+ * GPollableInputStreamInterface:
+ * @g_iface: The parent interface.
+ * @can_poll: Checks if the #GPollableInputStream instance is actually pollable
+ * @is_readable: Checks if the stream is readable
+ * @create_source: Creates a #GSource to poll the stream
+ * @read_nonblocking: Does a non-blocking read or returns
+ *   %G_IO_ERROR_WOULD_BLOCK
+ *
+ * The interface for pollable input streams.
+ *
+ * The default implementation of @can_poll always returns %TRUE.
+ *
+ * The default implementation of @read_nonblocking calls
+ * g_pollable_input_stream_is_readable(), and then calls
+ * g_input_stream_read() if it returns %TRUE. This means you only need
+ * to override it if it is possible that your @is_readable
+ * implementation may return %TRUE when the stream is not actually
+ * readable.
+ *
+ * Since: 2.28
+ */
+struct _GPollableInputStreamInterface
+{
+  GTypeInterface g_iface;
+
+  /* Virtual Table */
+  gboolean     (*can_poll)         (GPollableInputStream  *stream);
+
+  gboolean     (*is_readable)      (GPollableInputStream  *stream);
+  GSource *    (*create_source)    (GPollableInputStream  *stream,
+                                   GCancellable          *cancellable);
+  gssize       (*read_nonblocking) (GPollableInputStream  *stream,
+                                   void                  *buffer,
+                                   gsize                  size,
+                                   GError               **error);
+};
+
+GType    g_pollable_input_stream_get_type         (void) G_GNUC_CONST;
+
+gboolean g_pollable_input_stream_can_poll         (GPollableInputStream  *stream);
+
+gboolean g_pollable_input_stream_is_readable      (GPollableInputStream  *stream);
+GSource *g_pollable_input_stream_create_source    (GPollableInputStream  *stream,
+                                                  GCancellable          *cancellable);
+
+gssize   g_pollable_input_stream_read_nonblocking (GPollableInputStream  *stream,
+                                                  void                  *buffer,
+                                                  gsize                  size,
+                                                  GCancellable          *cancellable,
+                                                  GError               **error);
+
+/* Helper method for stream implementations */
+GSource *g_pollable_source_new                    (GObject               *stream);
+
+G_END_DECLS
+
+
+#endif /* __G_POLLABLE_INPUT_STREAM_H__ */
+
diff --git a/gio/gpollableoutputstream.c b/gio/gpollableoutputstream.c
new file mode 100644 (file)
index 0000000..cb9138a
--- /dev/null
@@ -0,0 +1,201 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include "config.h"
+
+#include <errno.h>
+
+#include "gpollableoutputstream.h"
+#include "gasynchelper.h"
+#include "gfiledescriptorbased.h"
+#include "gio-marshal.h"
+#include "glibintl.h"
+
+/**
+ * SECTION:gpollableoutputstream
+ * @short_description: Interface for pollable output streams
+ * @include: gio/gio.h
+ * @see_also: #GOutputStream, #GFileDescriptorBased, #GPollableInputStream
+ *
+ * #GPollableOutputStream is implemented by #GOutputStream<!-- -->s that
+ * can be polled for readiness to write. This can be used when
+ * interfacing with a non-gio API that expects
+ * unix-file-descriptor-style asynchronous I/O rather than gio-style.
+ *
+ * Since: 2.28
+ */
+
+G_DEFINE_INTERFACE (GPollableOutputStream, g_pollable_output_stream, G_TYPE_OUTPUT_STREAM)
+
+static gboolean g_pollable_output_stream_default_can_poll          (GPollableOutputStream *stream);
+static gssize   g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream  *stream,
+                                                                   const void             *buffer,
+                                                                   gsize                   size,
+                                                                   GError                **error);
+
+static void
+g_pollable_output_stream_default_init (GPollableOutputStreamInterface *iface)
+{
+  iface->can_poll          = g_pollable_output_stream_default_can_poll;
+  iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking;
+}
+
+static gboolean
+g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream)
+{
+  return TRUE;
+}
+
+/**
+ * g_pollable_output_stream_can_poll:
+ * @stream: a #GPollableOutputStream.
+ *
+ * Checks if @stream is actually pollable. Some classes may implement
+ * #GPollableOutputStream but have only certain instances of that
+ * class be pollable. If this method returns %FALSE, then the behavior
+ * of other #GPollableOutputStream methods is undefined.
+ *
+ * For any given stream, the value returned by this method is constant;
+ * a stream cannot switch from pollable to non-pollable or vice versa.
+ *
+ * Returns: %TRUE if @stream is pollable, %FALSE if not.
+ *
+ * Since: 2.28
+ */
+gboolean
+g_pollable_output_stream_can_poll (GPollableOutputStream *stream)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), FALSE);
+
+  return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->can_poll (stream);
+}
+
+/**
+ * g_pollable_output_stream_is_writable:
+ * @stream: a #GPollableOutputStream.
+ *
+ * Checks if @stream can be written.
+ *
+ * Note that some stream types may not be able to implement this 100%
+ * reliably, and it is possible that a call to g_output_stream_write()
+ * after this returns %TRUE would still block. To guarantee
+ * non-blocking behavior, you should always use
+ * g_pollable_output_stream_write_nonblocking(), which will return a
+ * %G_IO_ERROR_WOULD_BLOCK error rather than blocking.
+ *
+ * Returns: %TRUE if @stream is writable, %FALSE if not. If an error
+ *   has occurred on @stream, this will result in
+ *   g_pollable_output_stream_is_writable() returning %TRUE, and the
+ *   next attempt to write will return the error.
+ *
+ * Since: 2.28
+ */
+gboolean
+g_pollable_output_stream_is_writable (GPollableOutputStream *stream)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), FALSE);
+
+  return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->is_writable (stream);
+}
+
+/**
+ * g_pollable_output_stream_create_source:
+ * @stream: a #GPollableOutputStream.
+ * @cancellable: a #GCancellable, or %NULL
+ *
+ * Creates a #GSource that triggers when @stream can be written, or
+ * @cancellable is triggered or an error occurs. The callback on the
+ * source is of the #GPollableSourceFunc type.
+ *
+ * As with g_pollable_output_stream_is_writable(), it is possible that
+ * the stream may not actually be writable even after the source
+ * triggers, so you should use
+ * g_pollable_output_stream_write_nonblocking() rather than
+ * g_output_stream_write() from the callback.
+ *
+ * Returns: a new #GSource
+ *
+ * Since: 2.28
+ */
+GSource *
+g_pollable_output_stream_create_source (GPollableOutputStream *stream,
+                                       GCancellable          *cancellable)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), NULL);
+
+  return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
+         create_source (stream, cancellable);
+}
+
+static gssize
+g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream  *stream,
+                                                   const void             *buffer,
+                                                   gsize                   size,
+                                                   GError                **error)
+{
+  if (!g_pollable_output_stream_is_writable (stream))
+    {
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+                  g_strerror (EAGAIN));
+      return -1;
+    }
+
+  return g_output_stream_write (G_OUTPUT_STREAM (stream), buffer, size,
+                               NULL, error);
+}
+
+/**
+ * g_pollable_output_stream_write_nonblocking:
+ * @stream: a #GPollableOutputStream
+ * @buffer: a buffer to write data from
+ * @size: the number of bytes you want to write
+ * @cancellable: a #GCancellable, or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Attempts to write up to @size bytes from @buffer to @stream, as
+ * with g_output_stream_write(). If @stream is not currently writable,
+ * this will immediately return %G_IO_ERROR_WOULD_BLOCK, and you can
+ * use g_pollable_output_stream_create_source() to create a #GSource
+ * that will be triggered when @stream is writable.
+ *
+ * Note that since this method never blocks, you cannot actually
+ * use @cancellable to cancel it. However, it will return an error
+ * if @cancellable has already been cancelled when you call, which
+ * may happen if you call this method after a source triggers due
+ * to having been cancelled.
+ *
+ * Return value: the number of bytes written, or -1 on error (including
+ *   %G_IO_ERROR_WOULD_BLOCK).
+ */
+gssize
+g_pollable_output_stream_write_nonblocking (GPollableOutputStream  *stream,
+                                           const void             *buffer,
+                                           gsize                   size,
+                                           GCancellable           *cancellable,
+                                           GError                **error)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), -1);
+
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return -1;
+
+  return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
+    write_nonblocking (stream, buffer, size, error);
+}
diff --git a/gio/gpollableoutputstream.h b/gio/gpollableoutputstream.h
new file mode 100644 (file)
index 0000000..abef0ed
--- /dev/null
@@ -0,0 +1,98 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __G_POLLABLE_OUTPUT_STREAM_H__
+#define __G_POLLABLE_OUTPUT_STREAM_H__
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define G_TYPE_POLLABLE_OUTPUT_STREAM               (g_pollable_output_stream_get_type ())
+#define G_POLLABLE_OUTPUT_STREAM(obj)               (G_TYPE_CHECK_INSTANCE_CAST ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM, GPollableOutputStream))
+#define G_IS_POLLABLE_OUTPUT_STREAM(obj)            (G_TYPE_CHECK_INSTANCE_TYPE ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM))
+#define G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE(obj) (G_TYPE_INSTANCE_GET_INTERFACE ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM, GPollableOutputStreamInterface))
+
+/**
+ * GPollableOutputStream:
+ *
+ * An interface for a #GOutputStream that can be polled for readability.
+ *
+ * Since: 2.28
+ */
+typedef struct _GPollableOutputStreamInterface GPollableOutputStreamInterface;
+
+/**
+ * GPollableOutputStreamInterface:
+ * @g_iface: The parent interface.
+ * @can_poll: Checks if the #GPollableOutputStream instance is actually pollable
+ * @is_writable: Checks if the stream is writable
+ * @create_source: Creates a #GSource to poll the stream
+ * @write_nonblocking: Does a non-blocking write or returns
+ *   %G_IO_ERROR_WOULD_BLOCK
+ *
+ * The interface for pollable output streams.
+ *
+ * The default implementation of @can_poll always returns %TRUE.
+ *
+ * The default implementation of @write_nonblocking calls
+ * g_pollable_output_stream_is_writable(), and then calls
+ * g_output_stream_write() if it returns %TRUE. This means you only
+ * need to override it if it is possible that your @is_writable
+ * implementation may return %TRUE when the stream is not actually
+ * writable.
+ *
+ * Since: 2.28
+ */
+struct _GPollableOutputStreamInterface
+{
+  GTypeInterface g_iface;
+
+  /* Virtual Table */
+  gboolean     (*can_poll)          (GPollableOutputStream  *stream);
+
+  gboolean     (*is_writable)       (GPollableOutputStream  *stream);
+  GSource *    (*create_source)     (GPollableOutputStream  *stream,
+                                    GCancellable           *cancellable);
+  gssize       (*write_nonblocking) (GPollableOutputStream  *stream,
+                                    const void             *buffer,
+                                    gsize                   size,
+                                    GError                **error);
+};
+
+GType    g_pollable_output_stream_get_type          (void) G_GNUC_CONST;
+
+gboolean g_pollable_output_stream_can_poll          (GPollableOutputStream  *stream);
+
+gboolean g_pollable_output_stream_is_writable       (GPollableOutputStream  *stream);
+GSource *g_pollable_output_stream_create_source     (GPollableOutputStream  *stream,
+                                                    GCancellable           *cancellable);
+
+gssize   g_pollable_output_stream_write_nonblocking (GPollableOutputStream  *stream,
+                                                    const void             *buffer,
+                                                    gsize                   size,
+                                                    GCancellable           *cancellable,
+                                                    GError                **error);
+
+G_END_DECLS
+
+
+#endif /* __G_POLLABLE_OUTPUT_STREAM_H__ */
+
index 495f81c..852805a 100644 (file)
@@ -60,8 +60,7 @@
  * Since: 2.22
  */
 
  * Since: 2.22
  */
 
-G_DEFINE_TYPE (GSocketConnection,
-              g_socket_connection, G_TYPE_IO_STREAM);
+G_DEFINE_TYPE (GSocketConnection, g_socket_connection, G_TYPE_IO_STREAM);
 
 enum
 {
 
 enum
 {
index 4a27d90..fb9da1b 100644 (file)
 #include "gsocketinputstream.h"
 #include "glibintl.h"
 
 #include "gsocketinputstream.h"
 #include "glibintl.h"
 
-#include <gio/gsimpleasyncresult.h>
-#include <gio/gcancellable.h>
-#include <gio/gioerror.h>
+#include "gsimpleasyncresult.h"
+#include "gcancellable.h"
+#include "gpollableinputstream.h"
+#include "gioerror.h"
 
 
 
 
+static void g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+
 #define g_socket_input_stream_get_type _g_socket_input_stream_get_type
 #define g_socket_input_stream_get_type _g_socket_input_stream_get_type
-G_DEFINE_TYPE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM);
+G_DEFINE_TYPE_WITH_CODE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM,
+                        G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, g_socket_input_stream_pollable_iface_init)
+                        )
 
 enum
 {
 
 enum
 {
@@ -205,6 +210,44 @@ g_socket_input_stream_read_finish (GInputStream  *stream,
   return count;
 }
 
   return count;
 }
 
+static gboolean
+g_socket_input_stream_pollable_is_readable (GPollableInputStream *pollable)
+{
+  GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
+
+  return g_socket_condition_check (input_stream->priv->socket, G_IO_IN);
+}
+
+static GSource *
+g_socket_input_stream_pollable_create_source (GPollableInputStream *pollable,
+                                             GCancellable         *cancellable)
+{
+  GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
+  GSource *socket_source, *pollable_source;
+
+  pollable_source = g_pollable_source_new (G_OBJECT (input_stream));
+  socket_source = g_socket_create_source (input_stream->priv->socket,
+                                         G_IO_IN, cancellable);
+  g_source_set_dummy_callback (socket_source);
+  g_source_add_child_source (pollable_source, socket_source);
+  g_source_unref (socket_source);
+
+  return pollable_source;
+}
+
+static gssize
+g_socket_input_stream_pollable_read_nonblocking (GPollableInputStream  *pollable,
+                                                void                  *buffer,
+                                                gsize                  size,
+                                                GError               **error)
+{
+  GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
+
+  return g_socket_receive_with_blocking (input_stream->priv->socket,
+                                        buffer, size, FALSE,
+                                        NULL, error);
+}
+
 static void
 g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
 {
 static void
 g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
 {
@@ -230,6 +273,14 @@ g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
 }
 
 static void
 }
 
 static void
+g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+  iface->is_readable = g_socket_input_stream_pollable_is_readable;
+  iface->create_source = g_socket_input_stream_pollable_create_source;
+  iface->read_nonblocking = g_socket_input_stream_pollable_read_nonblocking;
+}
+
+static void
 g_socket_input_stream_init (GSocketInputStream *stream)
 {
   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, G_TYPE_SOCKET_INPUT_STREAM, GSocketInputStreamPrivate);
 g_socket_input_stream_init (GSocketInputStream *stream)
 {
   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, G_TYPE_SOCKET_INPUT_STREAM, GSocketInputStreamPrivate);
index 4febf88..0ef3360 100644 (file)
 #include "gsocketoutputstream.h"
 #include "gsocket.h"
 
 #include "gsocketoutputstream.h"
 #include "gsocket.h"
 
-#include <gio/gsimpleasyncresult.h>
-#include <gio/gcancellable.h>
-#include <gio/gioerror.h>
+#include "gsimpleasyncresult.h"
+#include "gcancellable.h"
+#include "gpollableinputstream.h"
+#include "gpollableoutputstream.h"
+#include "gioerror.h"
 #include "glibintl.h"
 
 
 #include "glibintl.h"
 
 
+static void g_socket_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
+
 #define g_socket_output_stream_get_type _g_socket_output_stream_get_type
 #define g_socket_output_stream_get_type _g_socket_output_stream_get_type
-G_DEFINE_TYPE (GSocketOutputStream, g_socket_output_stream, G_TYPE_OUTPUT_STREAM);
+G_DEFINE_TYPE_WITH_CODE (GSocketOutputStream, g_socket_output_stream, G_TYPE_OUTPUT_STREAM,
+                        G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, g_socket_output_stream_pollable_iface_init)
+                        )
 
 enum
 {
 
 enum
 {
@@ -207,6 +213,44 @@ g_socket_output_stream_write_finish (GOutputStream  *stream,
   return count;
 }
 
   return count;
 }
 
+static gboolean
+g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable)
+{
+  GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
+
+  return g_socket_condition_check (output_stream->priv->socket, G_IO_OUT);
+}
+
+static GSource *
+g_socket_output_stream_pollable_create_source (GPollableOutputStream *pollable,
+                                              GCancellable          *cancellable)
+{
+  GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
+  GSource *socket_source, *pollable_source;
+
+  pollable_source = g_pollable_source_new (G_OBJECT (output_stream));
+  socket_source = g_socket_create_source (output_stream->priv->socket,
+                                         G_IO_OUT, cancellable);
+  g_source_set_dummy_callback (socket_source);
+  g_source_add_child_source (pollable_source, socket_source);
+  g_source_unref (socket_source);
+
+  return pollable_source;
+}
+
+static gssize
+g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream  *pollable,
+                                                  const void             *buffer,
+                                                  gsize                   size,
+                                                  GError                **error)
+{
+  GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
+
+  return g_socket_send_with_blocking (output_stream->priv->socket,
+                                     buffer, size, FALSE,
+                                     NULL, error);
+}
+
 static void
 g_socket_output_stream_class_init (GSocketOutputStreamClass *klass)
 {
 static void
 g_socket_output_stream_class_init (GSocketOutputStreamClass *klass)
 {
@@ -232,6 +276,14 @@ g_socket_output_stream_class_init (GSocketOutputStreamClass *klass)
 }
 
 static void
 }
 
 static void
+g_socket_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+  iface->is_writable = g_socket_output_stream_pollable_is_writable;
+  iface->create_source = g_socket_output_stream_pollable_create_source;
+  iface->write_nonblocking = g_socket_output_stream_pollable_write_nonblocking;
+}
+
+static void
 g_socket_output_stream_init (GSocketOutputStream *stream)
 {
   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, G_TYPE_SOCKET_OUTPUT_STREAM, GSocketOutputStreamPrivate);
 g_socket_output_stream_init (GSocketOutputStream *stream)
 {
   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, G_TYPE_SOCKET_OUTPUT_STREAM, GSocketOutputStreamPrivate);
index e1ee34a..43a9247 100644 (file)
@@ -60,7 +60,12 @@ enum {
   PROP_CLOSE_FD
 };
 
   PROP_CLOSE_FD
 };
 
-G_DEFINE_TYPE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM);
+static void g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM,
+                        G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+                                               g_unix_input_stream_pollable_iface_init)
+                        );
 
 struct _GUnixInputStreamPrivate {
   int fd;
 
 struct _GUnixInputStreamPrivate {
   int fd;
@@ -111,6 +116,9 @@ static gboolean g_unix_input_stream_close_finish (GInputStream         *stream,
                                                  GAsyncResult         *result,
                                                  GError              **error);
 
                                                  GAsyncResult         *result,
                                                  GError              **error);
 
+static gboolean g_unix_input_stream_pollable_is_readable   (GPollableInputStream *stream);
+static GSource *g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
+                                                           GCancellable         *cancellable);
 
 static void
 g_unix_input_stream_finalize (GObject *object)
 
 static void
 g_unix_input_stream_finalize (GObject *object)
@@ -175,6 +183,13 @@ g_unix_input_stream_class_init (GUnixInputStreamClass *klass)
 }
 
 static void
 }
 
 static void
+g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+  iface->is_readable = g_unix_input_stream_pollable_is_readable;
+  iface->create_source = g_unix_input_stream_pollable_create_source;
+}
+
+static void
 g_unix_input_stream_set_property (GObject         *object,
                                  guint            prop_id,
                                  const GValue    *value,
 g_unix_input_stream_set_property (GObject         *object,
                                  guint            prop_id,
                                  const GValue    *value,
@@ -637,3 +652,37 @@ g_unix_input_stream_close_finish (GInputStream  *stream,
   /* Failures handled in generic close_finish code */
   return TRUE;
 }
   /* Failures handled in generic close_finish code */
   return TRUE;
 }
+
+static gboolean
+g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream)
+{
+  GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
+  GPollFD poll_fd;
+  gint result;
+
+  poll_fd.fd = unix_stream->priv->fd;
+  poll_fd.events = G_IO_IN;
+
+  do
+    result = g_poll (&poll_fd, 1, 0);
+  while (result == -1 && errno == EINTR);
+
+  return poll_fd.revents != 0;
+}
+
+static GSource *
+g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
+                                           GCancellable         *cancellable)
+{
+  GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
+  GSource *inner_source, *pollable_source;
+
+  pollable_source = g_pollable_source_new (G_OBJECT (stream));
+
+  inner_source = _g_fd_source_new (unix_stream->priv->fd, G_IO_IN, cancellable);
+  g_source_set_dummy_callback (inner_source);
+  g_source_add_child_source (pollable_source, inner_source);
+  g_source_unref (inner_source);
+
+  return pollable_source;
+}
index a0acc31..81021be 100644 (file)
@@ -60,8 +60,12 @@ enum {
   PROP_CLOSE_FD
 };
 
   PROP_CLOSE_FD
 };
 
-G_DEFINE_TYPE (GUnixOutputStream, g_unix_output_stream, G_TYPE_OUTPUT_STREAM);
+static void g_unix_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
 
 
+G_DEFINE_TYPE_WITH_CODE (GUnixOutputStream, g_unix_output_stream, G_TYPE_OUTPUT_STREAM,
+                        G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+                                               g_unix_output_stream_pollable_iface_init)
+                        );
 
 struct _GUnixOutputStreamPrivate {
   int fd;
 
 struct _GUnixOutputStreamPrivate {
   int fd;
@@ -103,6 +107,9 @@ static gboolean g_unix_output_stream_close_finish (GOutputStream        *stream,
                                                   GAsyncResult         *result,
                                                   GError              **error);
 
                                                   GAsyncResult         *result,
                                                   GError              **error);
 
+static gboolean g_unix_output_stream_pollable_is_writable   (GPollableOutputStream *stream);
+static GSource *g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream,
+                                                            GCancellable         *cancellable);
 
 static void
 g_unix_output_stream_finalize (GObject *object)
 
 static void
 g_unix_output_stream_finalize (GObject *object)
@@ -161,6 +168,13 @@ g_unix_output_stream_class_init (GUnixOutputStreamClass *klass)
 }
 
 static void
 }
 
 static void
+g_unix_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+  iface->is_writable = g_unix_output_stream_pollable_is_writable;
+  iface->create_source = g_unix_output_stream_pollable_create_source;
+}
+
+static void
 g_unix_output_stream_set_property (GObject         *object,
                                   guint            prop_id,
                                   const GValue    *value,
 g_unix_output_stream_set_property (GObject         *object,
                                   guint            prop_id,
                                   const GValue    *value,
@@ -593,3 +607,37 @@ g_unix_output_stream_close_finish (GOutputStream  *stream,
   /* Failures handled in generic close_finish code */
   return TRUE;
 }
   /* Failures handled in generic close_finish code */
   return TRUE;
 }
+
+static gboolean
+g_unix_output_stream_pollable_is_writable (GPollableOutputStream *stream)
+{
+  GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
+  GPollFD poll_fd;
+  gint result;
+
+  poll_fd.fd = unix_stream->priv->fd;
+  poll_fd.events = G_IO_OUT;
+
+  do
+    result = g_poll (&poll_fd, 1, 0);
+  while (result == -1 && errno == EINTR);
+
+  return poll_fd.revents != 0;
+}
+
+static GSource *
+g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream,
+                                            GCancellable          *cancellable)
+{
+  GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
+  GSource *inner_source, *pollable_source;
+
+  pollable_source = g_pollable_source_new (G_OBJECT (stream));
+
+  inner_source = _g_fd_source_new (unix_stream->priv->fd, G_IO_OUT, cancellable);
+  g_source_set_dummy_callback (inner_source);
+  g_source_add_child_source (pollable_source, inner_source);
+  g_source_unref (inner_source);
+
+  return pollable_source;
+}
index c987b64..0663fef 100644 (file)
@@ -60,6 +60,7 @@ memory-input-stream
 memory-output-stream
 network-address
 org.gtk.test.enums.xml
 memory-output-stream
 network-address
 org.gtk.test.enums.xml
+pollable
 proxy
 readwrite
 resolver
 proxy
 readwrite
 resolver
index 5e65c08..ad2273a 100644 (file)
@@ -43,6 +43,7 @@ TEST_PROGS +=                 \
        network-address         \
        gdbus-message           \
        socket                  \
        network-address         \
        gdbus-message           \
        socket                  \
+       pollable                \
        $(NULL)
 
 if OS_UNIX
        $(NULL)
 
 if OS_UNIX
@@ -204,6 +205,9 @@ network_address_LDADD         = $(progs_ldadd)
 socket_SOURCE            = socket.c
 socket_LDADD             = $(progs_ldadd)
 
 socket_SOURCE            = socket.c
 socket_LDADD             = $(progs_ldadd)
 
+pollable_SOURCE                  = pollable.c
+pollable_LDADD           = $(progs_ldadd)
+
 contexts_SOURCES         = contexts.c
 contexts_LDADD           = $(progs_ldadd) \
        $(top_builddir)/gthread/libgthread-2.0.la
 contexts_SOURCES         = contexts.c
 contexts_LDADD           = $(progs_ldadd) \
        $(top_builddir)/gthread/libgthread-2.0.la
diff --git a/gio/tests/pollable.c b/gio/tests/pollable.c
new file mode 100644 (file)
index 0000000..8669e2b
--- /dev/null
@@ -0,0 +1,240 @@
+/* GLib testing framework examples and tests
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <gio/gio.h>
+
+#ifdef G_OS_UNIX
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+#endif
+
+GMainLoop *loop;
+GPollableInputStream *in;
+GOutputStream *out;
+
+static gboolean
+poll_source_callback (GPollableInputStream *in,
+                     gpointer              user_data)
+{
+  GError *error = NULL;
+  char buf[2];
+  gssize nread;
+  gboolean *success = user_data;
+
+  nread = g_pollable_input_stream_read_nonblocking (in, buf, 2, NULL, &error);
+  g_assert_no_error (error);
+  g_assert_cmpint (nread, ==, 2);
+  g_assert_cmpstr (buf, ==, "x");
+
+  *success = TRUE;
+  return FALSE;
+}
+
+static gboolean
+check_source_readability_callback (gpointer user_data)
+{
+  gboolean expected = GPOINTER_TO_INT (user_data);
+  gboolean readable;
+
+  readable = g_pollable_input_stream_is_readable (in);
+  g_assert_cmpint (readable, ==, expected);
+  return FALSE;
+}
+
+static gboolean
+write_callback (gpointer user_data)
+{
+  char *buf = "x";
+  gssize nwrote;
+  GError *error = NULL;
+
+  nwrote = g_output_stream_write (out, buf, 2, NULL, &error);
+  g_assert_no_error (error);
+  g_assert_cmpint (nwrote, ==, 2);
+
+  check_source_readability_callback (GINT_TO_POINTER (TRUE));
+
+  return FALSE;
+}
+
+static gboolean
+check_source_and_quit_callback (gpointer user_data)
+{
+  check_source_readability_callback (user_data);
+  g_main_loop_quit (loop);
+  return FALSE;
+}
+
+static void
+test_streams (void)
+{
+  gboolean readable;
+  GError *error = NULL;
+  char buf[1];
+  gssize nread;
+  GSource *poll_source;
+  gboolean success = FALSE;
+
+  readable = g_pollable_input_stream_is_readable (in);
+  g_assert (!readable);
+
+  nread = g_pollable_input_stream_read_nonblocking (in, buf, 1, NULL, &error);
+  g_assert_cmpint (nread, ==, -1);
+  g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
+  g_clear_error (&error);
+
+  /* Create 4 sources, in decreasing order of priority:
+   *   1. poll source on @in
+   *   2. idle source that checks if @in is readable once
+   *      (it won't be) and then removes itself
+   *   3. idle source that writes a byte to @out, checks that
+   *      @in is now readable, and removes itself
+   *   4. idle source that checks if @in is readable once
+   *      (it won't be, since the poll source will fire before
+   *      this one does) and then quits the loop.
+   *
+   * If the poll source triggers before it should, then it will get a
+   * %G_IO_ERROR_WOULD_BLOCK, and if check() fails in either
+   * direction, we will catch it at some point.
+   */
+
+  poll_source = g_pollable_input_stream_create_source (in, NULL);
+  g_source_set_priority (poll_source, 1);
+  g_source_set_callback (poll_source, (GSourceFunc) poll_source_callback, &success, NULL);
+  g_source_attach (poll_source, NULL);
+  g_source_unref (poll_source);
+
+  g_idle_add_full (2, check_source_readability_callback, GINT_TO_POINTER (FALSE), NULL);
+  g_idle_add_full (3, write_callback, NULL, NULL);
+  g_idle_add_full (4, check_source_and_quit_callback, GINT_TO_POINTER (FALSE), NULL);
+
+  loop = g_main_loop_new (NULL, FALSE);
+  g_main_loop_run (loop);
+  g_main_loop_unref (loop);
+
+  g_assert_cmpint (success, ==, TRUE);
+}
+
+#ifdef G_OS_UNIX
+static void
+test_pollable_unix (void)
+{
+  int pipefds[2], status;
+
+  status = pipe (pipefds);
+  g_assert_cmpint (status, ==, 0);
+
+  in = G_POLLABLE_INPUT_STREAM (g_unix_input_stream_new (pipefds[0], TRUE));
+  out = g_unix_output_stream_new (pipefds[1], TRUE);
+
+  test_streams ();
+
+  g_object_unref (in);
+  g_object_unref (out);
+}
+#endif
+
+static void
+client_connected (GObject      *source,
+                 GAsyncResult *result,
+                 gpointer      user_data)
+{
+  GSocketClient *client = G_SOCKET_CLIENT (source);
+  GSocketConnection **conn = user_data;
+  GError *error = NULL;
+
+  *conn = g_socket_client_connect_finish (client, result, &error);
+  g_assert_no_error (error);
+}
+
+static void
+server_connected (GObject      *source,
+                 GAsyncResult *result,
+                 gpointer      user_data)
+{
+  GSocketListener *listener = G_SOCKET_LISTENER (source);
+  GSocketConnection **conn = user_data;
+  GError *error = NULL;
+
+  *conn = g_socket_listener_accept_finish (listener, result, NULL, &error);
+  g_assert_no_error (error);
+}
+
+static void
+test_pollable_socket (void)
+{
+  GInetAddress *iaddr;
+  GSocketAddress *saddr, *effective_address;
+  GSocketListener *listener;
+  GSocketClient *client;
+  GError *error = NULL;
+  GSocketConnection *client_conn = NULL, *server_conn = NULL;
+
+  iaddr = g_inet_address_new_loopback (G_SOCKET_FAMILY_IPV4);
+  saddr = g_inet_socket_address_new (iaddr, 0);
+  g_object_unref (iaddr);
+
+  listener = g_socket_listener_new ();
+  g_socket_listener_add_address (listener, saddr,
+                                G_SOCKET_TYPE_STREAM,
+                                G_SOCKET_PROTOCOL_TCP,
+                                NULL,
+                                &effective_address,
+                                &error);
+  g_assert_no_error (error);
+  g_object_unref (saddr);
+
+  client = g_socket_client_new ();
+
+  g_socket_client_connect_async (client,
+                                G_SOCKET_CONNECTABLE (effective_address),
+                                NULL, client_connected, &client_conn);
+  g_socket_listener_accept_async (listener, NULL,
+                                 server_connected, &server_conn);
+
+  while (!client_conn || !server_conn)
+    g_main_context_iteration (NULL, TRUE);
+
+  in = G_POLLABLE_INPUT_STREAM (g_io_stream_get_input_stream (G_IO_STREAM (client_conn)));
+  out = g_io_stream_get_output_stream (G_IO_STREAM (server_conn));
+
+  test_streams ();
+
+  g_object_unref (client_conn);
+  g_object_unref (server_conn);
+  g_object_unref (client);
+  g_object_unref (listener);
+}
+
+int
+main (int   argc,
+      char *argv[])
+{
+  g_type_init ();
+  g_test_init (&argc, &argv, NULL);
+
+#ifdef G_OS_UNIX
+  g_test_add_func ("/pollable/unix", test_pollable_unix);
+#endif
+  g_test_add_func ("/pollable/socket", test_pollable_socket);
+
+  return g_test_run();
+}
+