Add g_input_stream_read_all_async()
authorRyan Lortie <desrt@desrt.ca>
Mon, 29 Sep 2014 15:40:46 +0000 (11:40 -0400)
committerRyan Lortie <desrt@desrt.ca>
Tue, 21 Oct 2014 15:31:45 +0000 (11:31 -0400)
Add an asynchronous version of _read_all().

This API is not fully consistent with the normal expectations of a
non-asynchronous version.  Consistency between the sync and async version is
probably more important.

The API will still bind correctly, but access to all functionality will
not be available: specifically, in the case of an error, higher level
languages will be unable to determine how many bytes were successfully
read before the error.  Most users will probably not want to use this
information anyway, so this is OK -- and if they do need the
information, then they can just write the loop for themselves.

Heavily based on a patch from Ignacio Casal Quinteiro.

https://bugzilla.gnome.org/show_bug.cgi?id=737451

gio/ginputstream.c
gio/ginputstream.h

index b0dce76..ab8ceb4 100644 (file)
@@ -655,6 +655,185 @@ g_input_stream_read_finish (GInputStream  *stream,
   return class->read_finish (stream, result, error);
 }
 
+typedef struct
+{
+  gchar *buffer;
+  gsize to_read;
+  gsize bytes_read;
+} AsyncReadAll;
+
+static void
+free_async_read_all (gpointer data)
+{
+  g_slice_free (AsyncReadAll, data);
+}
+
+static void
+read_all_callback (GObject      *stream,
+                   GAsyncResult *result,
+                   gpointer      user_data)
+{
+  GTask *task = user_data;
+  AsyncReadAll *data = g_task_get_task_data (task);
+  gboolean got_eof = FALSE;
+
+  if (result)
+    {
+      GError *error = NULL;
+      gssize nread;
+
+      nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
+
+      if (nread == -1)
+        {
+          g_task_return_error (task, error);
+          g_object_unref (task);
+          return;
+        }
+
+      g_assert_cmpint (nread, <=, data->to_read);
+      data->to_read -= nread;
+      data->bytes_read += nread;
+      got_eof = (nread == 0);
+    }
+
+  if (got_eof || data->to_read == 0)
+    {
+      g_task_return_boolean (task, TRUE);
+      g_object_unref (task);
+    }
+
+  else
+    g_input_stream_read_async (G_INPUT_STREAM (stream),
+                               data->buffer + data->bytes_read,
+                               data->to_read,
+                               g_task_get_priority (task),
+                               g_task_get_cancellable (task),
+                               read_all_callback, task);
+}
+
+
+static void
+read_all_async_thread (GTask        *task,
+                       gpointer      source_object,
+                       gpointer      task_data,
+                       GCancellable *cancellable)
+{
+  GInputStream *stream = source_object;
+  AsyncReadAll *data = task_data;
+  GError *error = NULL;
+
+  if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
+                               g_task_get_cancellable (task), &error))
+    g_task_return_boolean (task, TRUE);
+  else
+    g_task_return_error (task, error);
+}
+
+/**
+ * g_input_stream_read_all_async:
+ * @stream: A #GInputStream
+ * @buffer: (array length=count) (element-type guint8): a buffer to
+ *     read data into (which should be at least count bytes long)
+ * @count: the number of bytes that will be read from the stream
+ * @io_priority: the [I/O priority][io-priority] of the request
+ * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore
+ * @callback: (scope async): callback to call when the request is satisfied
+ * @user_data: (closure): the data to pass to callback function
+ *
+ * Request an asynchronous read of @count bytes from the stream into the
+ * buffer starting at @buffer.
+ *
+ * This is the asynchronous equivalent of g_input_stream_read_all().
+ *
+ * Call g_input_stream_read_all_finish() to collect the result.
+ *
+ * Any outstanding I/O request with higher priority (lower numerical
+ * value) will be executed before an outstanding request with lower
+ * priority. Default priority is %G_PRIORITY_DEFAULT.
+ *
+ * Since: 2.44
+ **/
+void
+g_input_stream_read_all_async (GInputStream        *stream,
+                               void                *buffer,
+                               gsize                count,
+                               int                  io_priority,
+                               GCancellable        *cancellable,
+                               GAsyncReadyCallback  callback,
+                               gpointer             user_data)
+{
+  AsyncReadAll *data;
+  GTask *task;
+
+  g_return_if_fail (G_IS_INPUT_STREAM (stream));
+  g_return_if_fail (buffer != NULL || count == 0);
+
+  task = g_task_new (stream, cancellable, callback, user_data);
+  data = g_slice_new0 (AsyncReadAll);
+  data->buffer = buffer;
+  data->to_read = count;
+
+  g_task_set_task_data (task, data, free_async_read_all);
+  g_task_set_priority (task, io_priority);
+
+  /* If async reads are going to be handled via the threadpool anyway
+   * then we may as well do it with a single dispatch instead of
+   * bouncing in and out.
+   */
+  if (g_input_stream_async_read_is_via_threads (stream))
+    {
+      g_task_run_in_thread (task, read_all_async_thread);
+      g_object_unref (task);
+    }
+  else
+    read_all_callback (G_OBJECT (stream), NULL, task);
+}
+
+/**
+ * g_input_stream_read_all_finish:
+ * @stream: a #GInputStream
+ * @result: a #GAsyncResult
+ * @bytes_read: (out): location to store the number of bytes that was read from the stream
+ * @error: a #GError location to store the error occurring, or %NULL to ignore
+ *
+ * Finishes an asynchronous stream read operation started with
+ * g_input_stream_read_all_async().
+ *
+ * As a special exception to the normal conventions for functions that
+ * use #GError, if this function returns %FALSE (and sets @error) then
+ * @bytes_read will be set to the number of bytes that were successfully
+ * read before the error was encountered.  This functionality is only
+ * available from C.  If you need it from another language then you must
+ * write your own loop around g_input_stream_read_async().
+ *
+ * Returns: %TRUE on success, %FALSE if there was an error
+ *
+ * Since: 2.44
+ **/
+gboolean
+g_input_stream_read_all_finish (GInputStream  *stream,
+                                GAsyncResult  *result,
+                                gsize         *bytes_read,
+                                GError       **error)
+{
+  GTask *task;
+
+  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
+  g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+
+  task = G_TASK (result);
+
+  if (bytes_read)
+    {
+      AsyncReadAll *data = g_task_get_task_data (task);
+
+      *bytes_read = data->bytes_read;
+    }
+
+  return g_task_propagate_boolean (task, error);
+}
+
 static void
 read_bytes_callback (GObject      *stream,
                     GAsyncResult *result,
index a9c269c..e6879ec 100644 (file)
@@ -151,6 +151,21 @@ GLIB_AVAILABLE_IN_ALL
 gssize   g_input_stream_read_finish   (GInputStream          *stream,
                                       GAsyncResult          *result,
                                       GError               **error);
+
+GLIB_AVAILABLE_IN_2_44
+void     g_input_stream_read_all_async    (GInputStream          *stream,
+                                           void                  *buffer,
+                                           gsize                  count,
+                                           int                    io_priority,
+                                           GCancellable          *cancellable,
+                                           GAsyncReadyCallback    callback,
+                                           gpointer               user_data);
+GLIB_AVAILABLE_IN_2_44
+gboolean g_input_stream_read_all_finish   (GInputStream          *stream,
+                                           GAsyncResult          *result,
+                                           gsize                 *bytes_read,
+                                           GError               **error);
+
 GLIB_AVAILABLE_IN_2_34
 void     g_input_stream_read_bytes_async  (GInputStream          *stream,
                                           gsize                  count,