From 129e86cf2c2920bec69085a425ffd1af816a3f04 Mon Sep 17 00:00:00 2001 From: Ryan Lortie Date: Wed, 28 Jan 2009 16:39:39 +0000 Subject: [PATCH] =?utf8?q?Bug=20568575=20=E2=80=93=20=5Fasync=20functions?= =?utf8?q?=20for=20GDataInputStream?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit 2009-01-28 Ryan Lortie Bug 568575 – _async functions for GDataInputStream * gdatainputstream.h: * gdatainputstream.c: add _async versions of read_line and read_until. * gio.symbols: * ../docs/reference/gio/gio-sections.txt: add new functions * tests/sleepy-stream.c: new test case for async read line * tests/Makefile.am: add new test svn path=/trunk/; revision=7835 --- docs/reference/gio/gio-sections.txt | 4 + gio/ChangeLog | 11 ++ gio/gdatainputstream.c | 296 +++++++++++++++++++++++++++++++++++- gio/gdatainputstream.h | 95 +++++++----- gio/gio.symbols | 2 + gio/tests/Makefile.am | 4 + gio/tests/sleepy-stream.c | 294 +++++++++++++++++++++++++++++++++++ 7 files changed, 667 insertions(+), 39 deletions(-) create mode 100644 gio/tests/sleepy-stream.c diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt index 6018131..391dfaf 100644 --- a/docs/reference/gio/gio-sections.txt +++ b/docs/reference/gio/gio-sections.txt @@ -593,7 +593,11 @@ g_data_input_stream_read_uint32 g_data_input_stream_read_int64 g_data_input_stream_read_uint64 g_data_input_stream_read_line +g_data_input_stream_read_line_async +g_data_input_stream_read_line_finish g_data_input_stream_read_until +g_data_input_stream_read_until_async +g_data_input_stream_read_until_finish GDataInputStreamClass G_DATA_INPUT_STREAM diff --git a/gio/ChangeLog b/gio/ChangeLog index a3679d5..778b817 100644 --- a/gio/ChangeLog +++ b/gio/ChangeLog @@ -1,3 +1,14 @@ +2009-01-28 Ryan Lortie + + Bug 568575 – _async functions for GDataInputStream + + * gdatainputstream.h: + * gdatainputstream.c: add _async versions of read_line and read_until. + * gio.symbols: + * ../docs/reference/gio/gio-sections.txt: add new functions + * tests/sleepy-stream.c: new test case for async read line + * tests/Makefile.am: add new test + 2009-01-22 Ryan Lortie Bug 568723 – g_buffered_input_stream_fill_async doesn't take count == -1 diff --git a/gio/gdatainputstream.c b/gio/gdatainputstream.c index 6b0a978..387fede 100644 --- a/gio/gdatainputstream.c +++ b/gio/gdatainputstream.c @@ -2,6 +2,7 @@ * * Copyright (C) 2006-2007 Red Hat, Inc. * Copyright (C) 2007 Jürg Billeter + * Copyright © 2009 Codethink Limited * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -23,6 +24,8 @@ #include "config.h" #include "gdatainputstream.h" +#include "gsimpleasyncresult.h" +#include "gcancellable.h" #include "gioenumtypes.h" #include "gioerror.h" #include "glibintl.h" @@ -808,7 +811,6 @@ g_data_input_stream_read_line (GDataInputStream *stream, return line; } - static gssize scan_for_chars (GDataInputStream *stream, gsize *checked_out, @@ -928,5 +930,297 @@ g_data_input_stream_read_until (GDataInputStream *stream, return data_until; } +typedef struct +{ + GDataInputStream *stream; + GSimpleAsyncResult *simple; + gboolean last_saw_cr; + gsize checked; + gint io_priority; + GCancellable *cancellable; + + gchar *stop_chars; + gchar *line; + gsize length; +} GDataInputStreamReadData; + +static void +g_data_input_stream_read_complete (GDataInputStreamReadData *data, + gsize read_length, + gsize skip_length, + gboolean need_idle_dispatch) +{ + if (read_length || skip_length) + { + gssize bytes; + + data->length = read_length; + data->line = g_malloc (read_length + 1); + data->line[read_length] = '\0'; + + /* we already checked the buffer. this shouldn't fail. */ + bytes = g_input_stream_read (G_INPUT_STREAM (data->stream), + data->line, read_length, NULL, NULL); + g_assert_cmpint (bytes, ==, read_length); + + bytes = g_input_stream_skip (G_INPUT_STREAM (data->stream), + skip_length, NULL, NULL); + g_assert_cmpint (bytes, ==, skip_length); + } + + if (need_idle_dispatch) + g_simple_async_result_complete_in_idle (data->simple); + else + g_simple_async_result_complete (data->simple); + + g_object_unref (data->simple); +} + +static void +g_data_input_stream_read_line_ready (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + GDataInputStreamReadData *data = user_data; + gssize found_pos; + gint newline_len; + + if (result) + /* this is a callback. finish the async call. */ + { + GBufferedInputStream *buffer = G_BUFFERED_INPUT_STREAM (data->stream); + GError *error = NULL; + gssize bytes; + + bytes = g_buffered_input_stream_fill_finish (buffer, result, &error); + + if (bytes <= 0) + { + if (bytes < 0) + /* stream error. */ + { + g_simple_async_result_set_from_error (data->simple, error); + g_error_free (error); + data->checked = 0; + } + + g_data_input_stream_read_complete (data, data->checked, 0, FALSE); + return; + } + + /* only proceed if we got more bytes... */ + } + + if (data->stop_chars) + { + found_pos = scan_for_chars (data->stream, + &data->checked, + data->stop_chars); + newline_len = 0; + } + else + found_pos = scan_for_newline (data->stream, &data->checked, + &data->last_saw_cr, &newline_len); + + if (found_pos == -1) + /* didn't find a full line; need to buffer some more bytes */ + { + GBufferedInputStream *buffer = G_BUFFERED_INPUT_STREAM (data->stream); + gsize size; + + size = g_buffered_input_stream_get_buffer_size (buffer); + + if (g_buffered_input_stream_get_available (buffer) == size) + /* need to grow the buffer */ + g_buffered_input_stream_set_buffer_size (buffer, size * 2); + + /* try again */ + g_buffered_input_stream_fill_async (buffer, -1, data->io_priority, + data->cancellable, + g_data_input_stream_read_line_ready, + user_data); + } + else + { + /* read the line and the EOL. no error is possible. */ + g_data_input_stream_read_complete (data, found_pos, + newline_len, result == NULL); + } +} + +static void +g_data_input_stream_read_data_free (gpointer user_data) +{ + GDataInputStreamReadData *data = user_data; + + /* we don't hold a ref to ->simple because it keeps a ref to us. + * we are called because it is being finalized. + */ + + g_free (data->stop_chars); + if (data->cancellable) + g_object_unref (data->cancellable); + g_free (data->line); + g_slice_free (GDataInputStreamReadData, data); +} + +static void +g_data_input_stream_read_async (GDataInputStream *stream, + const gchar *stop_chars, + gint io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data, + gpointer source_tag) +{ + GDataInputStreamReadData *data; + + data = g_slice_new (GDataInputStreamReadData); + data->stream = stream; + if (cancellable) + g_object_ref (cancellable); + data->cancellable = cancellable; + data->stop_chars = g_strdup (stop_chars); + data->io_priority = io_priority; + data->last_saw_cr = FALSE; + data->checked = 0; + data->line = NULL; + + data->simple = g_simple_async_result_new (G_OBJECT (stream), callback, + user_data, source_tag); + g_simple_async_result_set_op_res_gpointer (data->simple, data, + g_data_input_stream_read_data_free); + g_data_input_stream_read_line_ready (NULL, NULL, data); +} + +static gchar * +g_data_input_stream_read_finish (GDataInputStream *stream, + GAsyncResult *result, + gsize *length, + GError **error) +{ + GDataInputStreamReadData *data; + GSimpleAsyncResult *simple; + gchar *line; + + simple = G_SIMPLE_ASYNC_RESULT (result); + + if (g_simple_async_result_propagate_error (simple, error)) + return NULL; + + data = g_simple_async_result_get_op_res_gpointer (simple); + + line = data->line; + data->line = NULL; + + if (length && line) + *length = data->length; + + return line; +} + +/** + * g_data_input_stream_read_line_async: + * @stream: a given #GDataInputStream. + * @io_priority: the I/O priority + * of the request. + * @cancellable: optional #GCancellable object, %NULL to ignore. + * @callback: callback to call when the request is satisfied. + * @user_data: the data to pass to callback function. + * + * The asynchronous version of g_data_input_stream_read_line(). It is + * an error to have two outstanding calls to this function. + **/ +void +g_data_input_stream_read_line_async (GDataInputStream *stream, + gint io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + g_return_if_fail (G_IS_DATA_INPUT_STREAM (stream)); + g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable)); + + g_data_input_stream_read_async (stream, NULL, io_priority, + cancellable, callback, user_data, + g_data_input_stream_read_line_async); +} + +/** + * g_data_input_stream_read_until_async: + * @stream: a given #GDataInputStream. + * @stop_chars: characters to terminate the read. + * @io_priority: the I/O priority + * of the request. + * @cancellable: optional #GCancellable object, %NULL to ignore. + * @callback: callback to call when the request is satisfied. + * @user_data: the data to pass to callback function. + * + * The asynchronous version of g_data_input_stream_read_until(). It is + * an error to have two outstanding calls to this function. + **/ +void +g_data_input_stream_read_until_async (GDataInputStream *stream, + const gchar *stop_chars, + gint io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + g_return_if_fail (G_IS_DATA_INPUT_STREAM (stream)); + g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable)); + g_return_if_fail (stop_chars != NULL); + + g_data_input_stream_read_async (stream, stop_chars, io_priority, + cancellable, callback, user_data, + g_data_input_stream_read_until_async); +} + +/** + * g_data_input_stream_read_line_finish: + * @stream: a given #GDataInputStream. + * @result: the #GAsyncResult that was provided to the callback. + * @length: a #gsize to get the length of the data read in. + * @error: #GError for error reporting. + * + * Finish an asynchronous call started by + * g_data_input_stream_read_line_async(). + **/ +gchar * +g_data_input_stream_read_line_finish (GDataInputStream *stream, + GAsyncResult *result, + gsize *length, + GError **error) +{ + g_return_val_if_fail ( + g_simple_async_result_is_valid (result, G_OBJECT (stream), + g_data_input_stream_read_line_async), NULL); + + return g_data_input_stream_read_finish (stream, result, length, error); +} + +/** + * g_data_input_stream_read_until_finish: + * @stream: a given #GDataInputStream. + * @result: the #GAsyncResult that was provided to the callback. + * @length: a #gsize to get the length of the data read in. + * @error: #GError for error reporting. + * + * Finish an asynchronous call started by + * g_data_input_stream_read_until_async(). + **/ +gchar * +g_data_input_stream_read_until_finish (GDataInputStream *stream, + GAsyncResult *result, + gsize *length, + GError **error) +{ + g_return_val_if_fail ( + g_simple_async_result_is_valid (result, G_OBJECT (stream), + g_data_input_stream_read_until_async), NULL); + + return g_data_input_stream_read_finish (stream, result, length, error); +} + #define __G_DATA_INPUT_STREAM_C__ #include "gioaliasdef.c" diff --git a/gio/gdatainputstream.h b/gio/gdatainputstream.h index 45febf8..c01ef87 100644 --- a/gio/gdatainputstream.h +++ b/gio/gdatainputstream.h @@ -69,45 +69,64 @@ struct _GDataInputStreamClass void (*_g_reserved5) (void); }; -GType g_data_input_stream_get_type (void) G_GNUC_CONST; -GDataInputStream * g_data_input_stream_new (GInputStream *base_stream); +GType g_data_input_stream_get_type (void) G_GNUC_CONST; +GDataInputStream * g_data_input_stream_new (GInputStream *base_stream); -void g_data_input_stream_set_byte_order (GDataInputStream *stream, - GDataStreamByteOrder order); -GDataStreamByteOrder g_data_input_stream_get_byte_order (GDataInputStream *stream); -void g_data_input_stream_set_newline_type (GDataInputStream *stream, - GDataStreamNewlineType type); -GDataStreamNewlineType g_data_input_stream_get_newline_type (GDataInputStream *stream); -guchar g_data_input_stream_read_byte (GDataInputStream *stream, - GCancellable *cancellable, - GError **error); -gint16 g_data_input_stream_read_int16 (GDataInputStream *stream, - GCancellable *cancellable, - GError **error); -guint16 g_data_input_stream_read_uint16 (GDataInputStream *stream, - GCancellable *cancellable, - GError **error); -gint32 g_data_input_stream_read_int32 (GDataInputStream *stream, - GCancellable *cancellable, - GError **error); -guint32 g_data_input_stream_read_uint32 (GDataInputStream *stream, - GCancellable *cancellable, - GError **error); -gint64 g_data_input_stream_read_int64 (GDataInputStream *stream, - GCancellable *cancellable, - GError **error); -guint64 g_data_input_stream_read_uint64 (GDataInputStream *stream, - GCancellable *cancellable, - GError **error); -char * g_data_input_stream_read_line (GDataInputStream *stream, - gsize *length, - GCancellable *cancellable, - GError **error); -char * g_data_input_stream_read_until (GDataInputStream *stream, - const gchar *stop_chars, - gsize *length, - GCancellable *cancellable, - GError **error); +void g_data_input_stream_set_byte_order (GDataInputStream *stream, + GDataStreamByteOrder order); +GDataStreamByteOrder g_data_input_stream_get_byte_order (GDataInputStream *stream); +void g_data_input_stream_set_newline_type (GDataInputStream *stream, + GDataStreamNewlineType type); +GDataStreamNewlineType g_data_input_stream_get_newline_type (GDataInputStream *stream); +guchar g_data_input_stream_read_byte (GDataInputStream *stream, + GCancellable *cancellable, + GError **error); +gint16 g_data_input_stream_read_int16 (GDataInputStream *stream, + GCancellable *cancellable, + GError **error); +guint16 g_data_input_stream_read_uint16 (GDataInputStream *stream, + GCancellable *cancellable, + GError **error); +gint32 g_data_input_stream_read_int32 (GDataInputStream *stream, + GCancellable *cancellable, + GError **error); +guint32 g_data_input_stream_read_uint32 (GDataInputStream *stream, + GCancellable *cancellable, + GError **error); +gint64 g_data_input_stream_read_int64 (GDataInputStream *stream, + GCancellable *cancellable, + GError **error); +guint64 g_data_input_stream_read_uint64 (GDataInputStream *stream, + GCancellable *cancellable, + GError **error); +char * g_data_input_stream_read_line (GDataInputStream *stream, + gsize *length, + GCancellable *cancellable, + GError **error); +void g_data_input_stream_read_line_async (GDataInputStream *stream, + gint io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +char * g_data_input_stream_read_line_finish (GDataInputStream *stream, + GAsyncResult *result, + gsize *length, + GError **error); +char * g_data_input_stream_read_until (GDataInputStream *stream, + const gchar *stop_chars, + gsize *length, + GCancellable *cancellable, + GError **error); +void g_data_input_stream_read_until_async (GDataInputStream *stream, + const gchar *stop_chars, + gint io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +char * g_data_input_stream_read_until_finish (GDataInputStream *stream, + GAsyncResult *result, + gsize *length, + GError **error); G_END_DECLS diff --git a/gio/gio.symbols b/gio/gio.symbols index 2e8a363..bfeaf60 100644 --- a/gio/gio.symbols +++ b/gio/gio.symbols @@ -169,6 +169,8 @@ g_data_input_stream_read_uint32 g_data_input_stream_read_int64 g_data_input_stream_read_uint64 g_data_input_stream_read_line +g_data_input_stream_read_line_async +g_data_input_stream_read_line_finish g_data_input_stream_read_until #endif #endif diff --git a/gio/tests/Makefile.am b/gio/tests/Makefile.am index 5e974a4..70a9ae8 100644 --- a/gio/tests/Makefile.am +++ b/gio/tests/Makefile.am @@ -25,6 +25,7 @@ TEST_PROGS += \ data-output-stream \ g-icon \ buffered-input-stream \ + sleepy-stream \ filter-streams \ simple-async-result @@ -69,6 +70,9 @@ unix_streams_LDADD = $(progs_ldadd) \ simple_async_result_SOURCES = simple-async-result.c simple_async_result_LDADD = $(progs_ldadd) +sleepy_stream_SOURCES = sleepy-stream.c +sleepy_stream_LDADD = $(progs_ldadd) + filter_streams_SOURCES = filter-streams.c filter_streams_LDADD = $(progs_ldadd) diff --git a/gio/tests/sleepy-stream.c b/gio/tests/sleepy-stream.c new file mode 100644 index 0000000..3c3cea9 --- /dev/null +++ b/gio/tests/sleepy-stream.c @@ -0,0 +1,294 @@ +/* + * Copyright © 2009 Codethink Limited + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; either version 2 of the licence or (at + * your option) any later version. + * + * See the included COPYING file for more information. + * + * Author: Ryan Lortie + */ + +#include +#include + +#define MAX_PIECE_SIZE 100 +#define MAX_PIECES 60 + +static gchar * +cook_piece (void) +{ + char buffer[MAX_PIECE_SIZE * 2]; + gint symbols, index = 0; + + symbols = g_test_rand_int_range (1, MAX_PIECE_SIZE + 1); + + while (symbols--) + { + gint c = g_test_rand_int_range (0, 30); + + switch (c) + { + case 26: + buffer[index++] = '\n'; + case 27: + buffer[index++] = '\r'; + break; + + case 28: + buffer[index++] = '\r'; + case 29: + buffer[index++] = '\n'; + break; + + default: + buffer[index++] = c + 'a'; + break; + } + + g_assert_cmpint (index, <=, sizeof buffer); + } + + return g_strndup (buffer, index); +} + +static gchar ** +cook_pieces (void) +{ + gchar **array; + gint pieces; + + pieces = g_test_rand_int_range (0, MAX_PIECES + 1); + array = g_new (char *, pieces + 1); + array[pieces] = NULL; + + while (pieces--) + array[pieces] = cook_piece (); + + return array; +} + +typedef struct +{ + GInputStream parent_instance; + + gboolean built_to_fail; + gchar **pieces; + gint index; + + const gchar *current; +} SleepyStream; + +typedef GInputStreamClass SleepyStreamClass; + +G_DEFINE_TYPE (SleepyStream, sleepy_stream, G_TYPE_INPUT_STREAM) + +static gssize +sleepy_stream_read (GInputStream *stream, + void *buffer, + gsize length, + GCancellable *cancellable, + GError **error) +{ + SleepyStream *sleepy = (SleepyStream *) stream; + + if (sleepy->pieces[sleepy->index] == NULL) + { + if (sleepy->built_to_fail) + { + g_set_error (error, 0, 0, "fail"); + return -1; + } + else + return 0; + } + else + { + if (!sleepy->current) + sleepy->current = sleepy->pieces[sleepy->index++]; + + length = MIN (strlen (sleepy->current), length); + memcpy (buffer, sleepy->current, length); + + sleepy->current += length; + if (*sleepy->current == '\0') + sleepy->current = NULL; + + return length; + } +} + +static void +sleepy_stream_init (SleepyStream *sleepy) +{ + sleepy->pieces = cook_pieces (); + sleepy->built_to_fail = FALSE; + sleepy->index = 0; +} + +static void +sleepy_stream_finalize (GObject *object) +{ + SleepyStream *sleepy = (SleepyStream *) object; + + g_strfreev (sleepy->pieces); + G_OBJECT_CLASS (sleepy_stream_parent_class) + ->finalize (object); +} + +static void +sleepy_stream_class_init (SleepyStreamClass *class) +{ + G_OBJECT_CLASS (class)->finalize = sleepy_stream_finalize; + class->read_fn = sleepy_stream_read; + + /* no read_async implementation. + * main thread will sleep while read runs in a worker. + */ +} + +SleepyStream * +sleepy_stream_new (void) +{ + return g_object_new (sleepy_stream_get_type (), NULL); +} + +static gboolean +read_line (GDataInputStream *stream, + GString *string, + const gchar *eol, + GError **error) +{ + gsize length; + int eol_len; + char *str; + + eol_len = 1 + (eol[1] != '\0'); + + str = g_data_input_stream_read_line (stream, &length, NULL, error); + + if (str == NULL) + return FALSE; + + g_assert (strstr (str, eol) == NULL); + g_assert (strlen (str) == length); + + g_string_append (string, str); + g_string_append (string, eol); + g_free (str); + + return TRUE; +} + +static void +build_comparison (GString *str, + SleepyStream *stream) +{ + /* build this for comparison */ + gint i; + + for (i = 0; stream->pieces[i]; i++) + g_string_append (str, stream->pieces[i]); + + if (str->len && str->str[str->len - 1] != '\n') + g_string_append_c (str, '\n'); +} + + +void +test (void) +{ + SleepyStream *stream = sleepy_stream_new (); + GDataInputStream *data; + GError *error = NULL; + GString *one; + GString *two; + + one = g_string_new (NULL); + two = g_string_new (NULL); + + data = g_data_input_stream_new (G_INPUT_STREAM (stream)); + g_data_input_stream_set_newline_type (data, G_DATA_STREAM_NEWLINE_TYPE_LF); + build_comparison (one, stream); + + while (read_line (data, two, "\n", &error)); + + g_assert_cmpstr (one->str, ==, two->str); + g_string_free (one, TRUE); + g_string_free (two, TRUE); + g_object_unref (stream); + g_object_unref (data); +} + +static GDataInputStream *data; +static GString *one, *two; +static GMainLoop *loop; +static const gchar *eol; + +static void +asynch_ready (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + GError *error = NULL; + gsize length; + gchar *str; + + g_assert (data == G_DATA_INPUT_STREAM (object)); + + str = g_data_input_stream_read_line_finish (data, result, &length, &error); + + if (str == NULL) + { + g_main_loop_quit (loop); + if (error) + g_error_free (error); + } + else + { + g_assert (length == strlen (str)); + g_string_append (two, str); + g_string_append (two, eol); + g_free (str); + + /* MOAR!! */ + g_data_input_stream_read_line_async (data, 0, NULL, asynch_ready, NULL); + } +} + + +void +asynch (void) +{ + SleepyStream *sleepy = sleepy_stream_new (); + + data = g_data_input_stream_new (G_INPUT_STREAM (sleepy)); + one = g_string_new (NULL); + two = g_string_new (NULL); + eol = "\n"; + + build_comparison (one, sleepy); + g_data_input_stream_read_line_async (data, 0, NULL, asynch_ready, NULL); + g_main_loop_run (loop = g_main_loop_new (NULL, FALSE)); + + g_assert_cmpstr (one->str, ==, two->str); + g_string_free (one, TRUE); + g_string_free (two, TRUE); + g_object_unref (sleepy); + g_object_unref (data); +} + +int +main (int argc, char **argv) +{ + g_test_init (&argc, &argv, NULL); + g_test_bug_base ("http://bugzilla.gnome.org/"); + + g_type_init (); + g_test_add_func ("/filter-stream/input", test); + g_test_add_func ("/filter-stream/async", asynch); + + return g_test_run(); +} -- 2.7.4