X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gio%2Fgoutputstream.c;h=2bec89e7e591ab6079f7b081727ae83c3192205a;hb=2a53b4d0e2c98a14aedf31e38f0ad1fb2e8fe26f;hp=f40af9c1fabbed74fb9f6814e7414804ee2375ae;hpb=d85b722734a6fcfe94032f6113de9e5c190fd7c3;p=platform%2Fupstream%2Fglib.git diff --git a/gio/goutputstream.c b/gio/goutputstream.c index f40af9c..2bec89e 100644 --- a/gio/goutputstream.c +++ b/gio/goutputstream.c @@ -13,29 +13,29 @@ * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General - * Public License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place, Suite 330, - * Boston, MA 02111-1307, USA. + * Public License along with this library; if not, see . * * Author: Alexander Larsson */ #include "config.h" +#include #include "goutputstream.h" #include "gcancellable.h" #include "gasyncresult.h" -#include "gsimpleasyncresult.h" +#include "gtask.h" #include "ginputstream.h" #include "gioerror.h" +#include "gioprivate.h" #include "glibintl.h" - +#include "gpollableoutputstream.h" /** * SECTION:goutputstream * @short_description: Base class for implementing streaming output * @include: gio/gio.h * - * GOutputStream has functions to write to a stream (g_output_stream_write()), + * #GOutputStream has functions to write to a stream (g_output_stream_write()), * to close a stream (g_output_stream_close()) and to flush pending writes * (g_output_stream_flush()). * @@ -45,8 +45,6 @@ * All of these functions have async variants too. **/ -G_DEFINE_ABSTRACT_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT); - struct _GOutputStreamPrivate { guint closed : 1; guint pending : 1; @@ -54,6 +52,8 @@ struct _GOutputStreamPrivate { GAsyncReadyCallback outstanding_callback; }; +G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT) + static gssize g_output_stream_real_splice (GOutputStream *stream, GInputStream *source, GOutputStreamSpliceFlags flags, @@ -95,15 +95,17 @@ static void g_output_stream_real_close_async (GOutputStream *s static gboolean g_output_stream_real_close_finish (GOutputStream *stream, GAsyncResult *result, GError **error); -static gboolean _g_output_stream_close_internal (GOutputStream *stream, +static gboolean g_output_stream_internal_close (GOutputStream *stream, GCancellable *cancellable, GError **error); - -static void -g_output_stream_finalize (GObject *object) -{ - G_OBJECT_CLASS (g_output_stream_parent_class)->finalize (object); -} +static void g_output_stream_internal_close_async (GOutputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gboolean g_output_stream_internal_close_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error); static void g_output_stream_dispose (GObject *object) @@ -122,10 +124,7 @@ static void g_output_stream_class_init (GOutputStreamClass *klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); - - g_type_class_add_private (klass, sizeof (GOutputStreamPrivate)); - - gobject_class->finalize = g_output_stream_finalize; + gobject_class->dispose = g_output_stream_dispose; klass->splice = g_output_stream_real_splice; @@ -143,9 +142,7 @@ g_output_stream_class_init (GOutputStreamClass *klass) static void g_output_stream_init (GOutputStream *stream) { - stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, - G_TYPE_OUTPUT_STREAM, - GOutputStreamPrivate); + stream->priv = g_output_stream_get_instance_private (stream); } /** @@ -169,7 +166,7 @@ g_output_stream_init (GOutputStream *stream) * is written or an error occurs; 0 is never returned (unless * @count is 0). * - * If @cancellable is not NULL, then the operation can be cancelled by + * If @cancellable is not %NULL, then the operation can be cancelled by * triggering the cancellable object from another thread. If the operation * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an * operation was partially finished when the operation was cancelled the @@ -179,7 +176,7 @@ g_output_stream_init (GOutputStream *stream) * * Virtual: write_fn * - * Return value: Number of bytes written, or -1 on error + * Returns: Number of bytes written, or -1 on error **/ gssize g_output_stream_write (GOutputStream *stream, @@ -248,11 +245,18 @@ g_output_stream_write (GOutputStream *stream, * On a successful write of @count bytes, %TRUE is returned, and @bytes_written * is set to @count. * - * If there is an error during the operation FALSE is returned and @error - * is set to indicate the error status, @bytes_written is updated to contain - * the number of bytes written into the stream before the error occurred. + * If there is an error during the operation %FALSE is returned and @error + * is set to indicate the error status. + * + * As a special exception to the normal conventions for functions that + * use #GError, if this function returns %FALSE (and sets @error) then + * @bytes_written will be set to the number of bytes that were + * successfully written before the error was encountered. This + * functionality is only available from C. If you need it from another + * language then you must write your own loop around + * g_output_stream_write(). * - * Return value: %TRUE on success, %FALSE if there was an error + * Returns: %TRUE on success, %FALSE if there was an error **/ gboolean g_output_stream_write_all (GOutputStream *stream, @@ -293,6 +297,142 @@ g_output_stream_write_all (GOutputStream *stream, } /** + * g_output_stream_printf: + * @stream: a #GOutputStream. + * @bytes_written: (out): location to store the number of bytes that was + * written to the stream + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @error: location to store the error occurring, or %NULL to ignore + * @format: the format string. See the printf() documentation + * @...: the parameters to insert into the format string + * + * This is a utility function around g_output_stream_write_all(). It + * uses g_strdup_vprintf() to turn @format and @... into a string that + * is then written to @stream. + * + * See the documentation of g_output_stream_write_all() about the + * behavior of the actual write operation. + * + * Note that partial writes cannot be properly checked with this + * function due to the variable length of the written string, if you + * need precise control over partial write failures, you need to + * create you own printf()-like wrapper around g_output_stream_write() + * or g_output_stream_write_all(). + * + * Since: 2.40 + * + * Returns: %TRUE on success, %FALSE if there was an error + **/ +gboolean +g_output_stream_printf (GOutputStream *stream, + gsize *bytes_written, + GCancellable *cancellable, + GError **error, + const gchar *format, + ...) +{ + va_list args; + gboolean success; + + va_start (args, format); + success = g_output_stream_vprintf (stream, bytes_written, cancellable, + error, format, args); + va_end (args); + + return success; +} + +/** + * g_output_stream_vprintf: + * @stream: a #GOutputStream. + * @bytes_written: (out): location to store the number of bytes that was + * written to the stream + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @error: location to store the error occurring, or %NULL to ignore + * @format: the format string. See the printf() documentation + * @args: the parameters to insert into the format string + * + * This is a utility function around g_output_stream_write_all(). It + * uses g_strdup_vprintf() to turn @format and @args into a string that + * is then written to @stream. + * + * See the documentation of g_output_stream_write_all() about the + * behavior of the actual write operation. + * + * Note that partial writes cannot be properly checked with this + * function due to the variable length of the written string, if you + * need precise control over partial write failures, you need to + * create you own printf()-like wrapper around g_output_stream_write() + * or g_output_stream_write_all(). + * + * Since: 2.40 + * + * Returns: %TRUE on success, %FALSE if there was an error + **/ +gboolean +g_output_stream_vprintf (GOutputStream *stream, + gsize *bytes_written, + GCancellable *cancellable, + GError **error, + const gchar *format, + va_list args) +{ + gchar *text; + gboolean success; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (stream), FALSE); + g_return_val_if_fail (error == NULL || *error == NULL, FALSE); + g_return_val_if_fail (format != NULL, FALSE); + + text = g_strdup_vprintf (format, args); + success = g_output_stream_write_all (stream, + text, strlen (text), + bytes_written, cancellable, error); + g_free (text); + + return success; +} + +/** + * g_output_stream_write_bytes: + * @stream: a #GOutputStream. + * @bytes: the #GBytes to write + * @cancellable: (allow-none): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore + * + * A wrapper function for g_output_stream_write() which takes a + * #GBytes as input. This can be more convenient for use by language + * bindings or in other cases where the refcounted nature of #GBytes + * is helpful over a bare pointer interface. + * + * However, note that this function may still perform partial writes, + * just like g_output_stream_write(). If that occurs, to continue + * writing, you will need to create a new #GBytes containing just the + * remaining bytes, using g_bytes_new_from_bytes(). Passing the same + * #GBytes instance multiple times potentially can result in duplicated + * data in the output stream. + * + * Returns: Number of bytes written, or -1 on error + **/ +gssize +g_output_stream_write_bytes (GOutputStream *stream, + GBytes *bytes, + GCancellable *cancellable, + GError **error) +{ + gsize size; + gconstpointer data; + + data = g_bytes_get_data (bytes, &size); + + return g_output_stream_write (stream, + data, size, + cancellable, + error); +} + +/** * g_output_stream_flush: * @stream: a #GOutputStream. * @cancellable: (allow-none): optional cancellable object @@ -308,7 +448,7 @@ g_output_stream_write_all (GOutputStream *stream, * triggering the cancellable object from another thread. If the operation * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. * - * Return value: %TRUE on success, %FALSE on error + * Returns: %TRUE on success, %FALSE on error **/ gboolean g_output_stream_flush (GOutputStream *stream, @@ -465,7 +605,7 @@ g_output_stream_real_splice (GOutputStream *stream, if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET) { /* But write errors on close are bad! */ - res = _g_output_stream_close_internal (stream, cancellable, error); + res = g_output_stream_internal_close (stream, cancellable, error); } if (res) @@ -477,9 +617,9 @@ g_output_stream_real_splice (GOutputStream *stream, /* Must always be called inside * g_output_stream_set_pending()/g_output_stream_clear_pending(). */ static gboolean -_g_output_stream_close_internal (GOutputStream *stream, - GCancellable *cancellable, - GError **error) +g_output_stream_internal_close (GOutputStream *stream, + GCancellable *cancellable, + GError **error) { GOutputStreamClass *class; gboolean res; @@ -551,7 +691,7 @@ _g_output_stream_close_internal (GOutputStream *stream, * is important to check and report the error to the user, otherwise * there might be a loss of data as all data might not be written. * - * If @cancellable is not NULL, then the operation can be cancelled by + * If @cancellable is not %NULL, then the operation can be cancelled by * triggering the cancellable object from another thread. If the operation * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. * Cancelling a close will still leave the stream closed, but there some streams @@ -559,7 +699,7 @@ _g_output_stream_close_internal (GOutputStream *stream, * cancellation (as with any error) there is no guarantee that all written * data will reach the target. * - * Return value: %TRUE on success, %FALSE on failure + * Returns: %TRUE on success, %FALSE on failure **/ gboolean g_output_stream_close (GOutputStream *stream, @@ -576,7 +716,7 @@ g_output_stream_close (GOutputStream *stream, if (!g_output_stream_set_pending (stream, error)) return FALSE; - res = _g_output_stream_close_internal (stream, cancellable, error); + res = g_output_stream_internal_close (stream, cancellable, error); g_output_stream_clear_pending (stream); @@ -584,96 +724,31 @@ g_output_stream_close (GOutputStream *stream, } static void -async_ready_callback_wrapper (GObject *source_object, - GAsyncResult *res, - gpointer user_data) -{ - GOutputStream *stream = G_OUTPUT_STREAM (source_object); - - g_output_stream_clear_pending (stream); - if (stream->priv->outstanding_callback) - (*stream->priv->outstanding_callback) (source_object, res, user_data); - g_object_unref (stream); -} - -typedef struct { - gint io_priority; - GCancellable *cancellable; - GError *flush_error; - gpointer user_data; -} CloseUserData; - -static void -async_ready_close_callback_wrapper (GObject *source_object, +async_ready_write_callback_wrapper (GObject *source_object, GAsyncResult *res, gpointer user_data) { GOutputStream *stream = G_OUTPUT_STREAM (source_object); - CloseUserData *data = user_data; - - stream->priv->closing = FALSE; - stream->priv->closed = TRUE; - - g_output_stream_clear_pending (stream); - - if (stream->priv->outstanding_callback) - { - if (data->flush_error != NULL) - { - GSimpleAsyncResult *err; - - err = g_simple_async_result_new_take_error (source_object, - stream->priv->outstanding_callback, - data->user_data, - data->flush_error); - data->flush_error = NULL; - - (*stream->priv->outstanding_callback) (source_object, - G_ASYNC_RESULT (err), - data->user_data); - g_object_unref (err); - } - else - { - (*stream->priv->outstanding_callback) (source_object, - res, - data->user_data); - } - } - - g_object_unref (stream); - - if (data->cancellable) - g_object_unref (data->cancellable); - - if (data->flush_error) - g_error_free (data->flush_error); - - g_slice_free (CloseUserData, data); -} - -static void -async_ready_close_flushed_callback_wrapper (GObject *source_object, - GAsyncResult *res, - gpointer user_data) -{ - GOutputStream *stream = G_OUTPUT_STREAM (source_object); GOutputStreamClass *class; - CloseUserData *data = user_data; - GSimpleAsyncResult *simple; + GTask *task = user_data; + gssize nwrote; + GError *error = NULL; - /* propagate the possible error */ - if (G_IS_SIMPLE_ASYNC_RESULT (res)) + g_output_stream_clear_pending (stream); + + if (g_async_result_legacy_propagate_error (res, &error)) + nwrote = -1; + else { - simple = G_SIMPLE_ASYNC_RESULT (res); - g_simple_async_result_propagate_error (simple, &data->flush_error); + class = G_OUTPUT_STREAM_GET_CLASS (stream); + nwrote = class->write_finish (stream, res, &error); } - class = G_OUTPUT_STREAM_GET_CLASS (stream); - - /* we still close, even if there was a flush error */ - class->close_async (stream, data->io_priority, data->cancellable, - async_ready_close_callback_wrapper, user_data); + if (nwrote >= 0) + g_task_return_int (task, nwrote); + else + g_task_return_error (task, error); + g_object_unref (task); } /** @@ -716,7 +791,12 @@ async_ready_close_flushed_callback_wrapper (GObject *source_object, * * For the synchronous, blocking version of this function, see * g_output_stream_write(). - **/ + * + * Note that no copy of @buffer will be made, so it must stay valid + * until @callback is called. See g_output_stream_write_bytes_async() + * for a #GBytes version that will automatically hold a reference to + * the contents (without copying) for the duration of the call. + */ void g_output_stream_write_async (GOutputStream *stream, const void *buffer, @@ -727,49 +807,43 @@ g_output_stream_write_async (GOutputStream *stream, gpointer user_data) { GOutputStreamClass *class; - GSimpleAsyncResult *simple; GError *error = NULL; + GTask *task; g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); g_return_if_fail (buffer != NULL); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_write_async); + g_task_set_priority (task, io_priority); + if (count == 0) { - simple = g_simple_async_result_new (G_OBJECT (stream), - callback, - user_data, - g_output_stream_write_async); - g_simple_async_result_complete_in_idle (simple); - g_object_unref (simple); + g_task_return_int (task, 0); + g_object_unref (task); return; } if (((gssize) count) < 0) { - g_simple_async_report_error_in_idle (G_OBJECT (stream), - callback, - user_data, - G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, - _("Too large count value passed to %s"), - G_STRFUNC); + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, + _("Too large count value passed to %s"), + G_STRFUNC); + g_object_unref (task); return; } if (!g_output_stream_set_pending (stream, &error)) { - g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream), - callback, - user_data, - error); + g_task_return_error (task, error); + g_object_unref (task); return; } class = G_OUTPUT_STREAM_GET_CLASS (stream); - stream->priv->outstanding_callback = callback; - g_object_ref (stream); class->write_async (stream, buffer, count, io_priority, cancellable, - async_ready_callback_wrapper, user_data); + async_ready_write_callback_wrapper, task); } /** @@ -788,32 +862,287 @@ g_output_stream_write_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple; - GOutputStreamClass *class; + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE); - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); - g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1); + /* @result is always the GTask created by g_output_stream_write_async(); + * we called class->write_finish() from async_ready_write_callback_wrapper. + */ + return g_task_propagate_int (G_TASK (result), error); +} + +typedef struct +{ + const guint8 *buffer; + gsize to_write; + gsize bytes_written; +} AsyncWriteAll; + +static void +free_async_write_all (gpointer data) +{ + g_slice_free (AsyncWriteAll, data); +} - if (G_IS_SIMPLE_ASYNC_RESULT (result)) +static void +write_all_callback (GObject *stream, + GAsyncResult *result, + gpointer user_data) +{ + GTask *task = user_data; + AsyncWriteAll *data = g_task_get_task_data (task); + + if (result) { - simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return -1; + GError *error = NULL; + gssize nwritten; + + nwritten = g_output_stream_write_finish (G_OUTPUT_STREAM (stream), result, &error); + + if (nwritten == -1) + { + g_task_return_error (task, error); + g_object_unref (task); + return; + } + + g_assert_cmpint (nwritten, <=, data->to_write); + g_warn_if_fail (nwritten > 0); - /* Special case writes of 0 bytes */ - if (g_simple_async_result_get_source_tag (simple) == g_output_stream_write_async) - return 0; + data->to_write -= nwritten; + data->bytes_written += nwritten; } - - class = G_OUTPUT_STREAM_GET_CLASS (stream); - return class->write_finish (stream, result, error); + + if (data->to_write == 0) + { + g_task_return_boolean (task, TRUE); + g_object_unref (task); + } + + else + g_output_stream_write_async (G_OUTPUT_STREAM (stream), + data->buffer + data->bytes_written, + data->to_write, + g_task_get_priority (task), + g_task_get_cancellable (task), + write_all_callback, task); } -typedef struct { - GInputStream *source; - gpointer user_data; - GAsyncReadyCallback callback; -} SpliceUserData; +static void +write_all_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GOutputStream *stream = source_object; + AsyncWriteAll *data = task_data; + GError *error = NULL; + + if (g_output_stream_write_all (stream, data->buffer, data->to_write, &data->bytes_written, + g_task_get_cancellable (task), &error)) + g_task_return_boolean (task, TRUE); + else + g_task_return_error (task, error); +} + +/** + * g_output_stream_write_all_async: + * @stream: A #GOutputStream + * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write + * @count: the number of bytes to write + * @io_priority: the io priority of the request + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function + * + * Request an asynchronous write of @count bytes from @buffer into + * the stream. When the operation is finished @callback will be called. + * You can then call g_output_stream_write_all_finish() to get the result of the + * operation. + * + * This is the asynchronous version of g_output_stream_write_all(). + * + * Call g_output_stream_write_all_finish() to collect the result. + * + * Any outstanding I/O request with higher priority (lower numerical + * value) will be executed before an outstanding request with lower + * priority. Default priority is %G_PRIORITY_DEFAULT. + * + * Note that no copy of @buffer will be made, so it must stay valid + * until @callback is called. + * + * Since: 2.44 + */ +void +g_output_stream_write_all_async (GOutputStream *stream, + const void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + AsyncWriteAll *data; + GTask *task; + + g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); + g_return_if_fail (buffer != NULL || count == 0); + + task = g_task_new (stream, cancellable, callback, user_data); + data = g_slice_new0 (AsyncWriteAll); + data->buffer = buffer; + data->to_write = count; + + g_task_set_task_data (task, data, free_async_write_all); + g_task_set_priority (task, io_priority); + + /* If async writes are going to be handled via the threadpool anyway + * then we may as well do it with a single dispatch instead of + * bouncing in and out. + */ + if (g_output_stream_async_write_is_via_threads (stream)) + { + g_task_run_in_thread (task, write_all_async_thread); + g_object_unref (task); + } + else + write_all_callback (G_OBJECT (stream), NULL, task); +} + +/** + * g_output_stream_write_all_finish: + * @stream: a #GOutputStream + * @result: a #GAsyncResult + * @bytes_written: (out): location to store the number of bytes that was written to the stream + * @error: a #GError location to store the error occurring, or %NULL to ignore. + * + * Finishes an asynchronous stream write operation started with + * g_output_stream_write_all_async(). + * + * As a special exception to the normal conventions for functions that + * use #GError, if this function returns %FALSE (and sets @error) then + * @bytes_written will be set to the number of bytes that were + * successfully written before the error was encountered. This + * functionality is only available from C. If you need it from another + * language then you must write your own loop around + * g_output_stream_write_async(). + * + * Returns: %TRUE on success, %FALSE if there was an error + * + * Since: 2.44 + **/ +gboolean +g_output_stream_write_all_finish (GOutputStream *stream, + GAsyncResult *result, + gsize *bytes_written, + GError **error) +{ + GTask *task; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + + task = G_TASK (result); + + if (bytes_written) + { + AsyncWriteAll *data = (AsyncWriteAll *)g_task_get_task_data (task); + + *bytes_written = data->bytes_written; + } + + return g_task_propagate_boolean (task, error); +} + +static void +write_bytes_callback (GObject *stream, + GAsyncResult *result, + gpointer user_data) +{ + GTask *task = user_data; + GError *error = NULL; + gssize nwrote; + + nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream), + result, &error); + if (nwrote == -1) + g_task_return_error (task, error); + else + g_task_return_int (task, nwrote); + g_object_unref (task); +} + +/** + * g_output_stream_write_bytes_async: + * @stream: A #GOutputStream. + * @bytes: The bytes to write + * @io_priority: the io priority of the request. + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function + * + * This function is similar to g_output_stream_write_async(), but + * takes a #GBytes as input. Due to the refcounted nature of #GBytes, + * this allows the stream to avoid taking a copy of the data. + * + * However, note that this function may still perform partial writes, + * just like g_output_stream_write_async(). If that occurs, to continue + * writing, you will need to create a new #GBytes containing just the + * remaining bytes, using g_bytes_new_from_bytes(). Passing the same + * #GBytes instance multiple times potentially can result in duplicated + * data in the output stream. + * + * For the synchronous, blocking version of this function, see + * g_output_stream_write_bytes(). + **/ +void +g_output_stream_write_bytes_async (GOutputStream *stream, + GBytes *bytes, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + gsize size; + gconstpointer data; + + data = g_bytes_get_data (bytes, &size); + + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_task_data (task, g_bytes_ref (bytes), + (GDestroyNotify) g_bytes_unref); + + g_output_stream_write_async (stream, + data, size, + io_priority, + cancellable, + write_bytes_callback, + task); +} + +/** + * g_output_stream_write_bytes_finish: + * @stream: a #GOutputStream. + * @result: a #GAsyncResult. + * @error: a #GError location to store the error occurring, or %NULL to + * ignore. + * + * Finishes a stream write-from-#GBytes operation. + * + * Returns: a #gssize containing the number of bytes written to the stream. + **/ +gssize +g_output_stream_write_bytes_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); + g_return_val_if_fail (g_task_is_valid (result, stream), -1); + + return g_task_propagate_int (G_TASK (result), error); +} static void async_ready_splice_callback_wrapper (GObject *source_object, @@ -821,16 +1150,26 @@ async_ready_splice_callback_wrapper (GObject *source_object, gpointer _data) { GOutputStream *stream = G_OUTPUT_STREAM (source_object); - SpliceUserData *data = _data; - + GOutputStreamClass *class; + GTask *task = _data; + gssize nspliced; + GError *error = NULL; + g_output_stream_clear_pending (stream); - if (data->callback) - (*data->callback) (source_object, res, data->user_data); - - g_object_unref (stream); - g_object_unref (data->source); - g_free (data); + if (g_async_result_legacy_propagate_error (res, &error)) + nspliced = -1; + else + { + class = G_OUTPUT_STREAM_GET_CLASS (stream); + nspliced = class->splice_finish (stream, res, &error); + } + + if (nspliced >= 0) + g_task_return_int (task, nspliced); + else + g_task_return_error (task, error); + g_object_unref (task); } /** @@ -861,41 +1200,37 @@ g_output_stream_splice_async (GOutputStream *stream, gpointer user_data) { GOutputStreamClass *class; - SpliceUserData *data; + GTask *task; GError *error = NULL; g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); g_return_if_fail (G_IS_INPUT_STREAM (source)); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_splice_async); + g_task_set_priority (task, io_priority); + g_task_set_task_data (task, g_object_ref (source), g_object_unref); + if (g_input_stream_is_closed (source)) { - g_simple_async_report_error_in_idle (G_OBJECT (stream), - callback, - user_data, - G_IO_ERROR, G_IO_ERROR_CLOSED, - _("Source stream is already closed")); + g_task_return_new_error (task, + G_IO_ERROR, G_IO_ERROR_CLOSED, + _("Source stream is already closed")); + g_object_unref (task); return; } if (!g_output_stream_set_pending (stream, &error)) { - g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream), - callback, - user_data, - error); + g_task_return_error (task, error); + g_object_unref (task); return; } class = G_OUTPUT_STREAM_GET_CLASS (stream); - data = g_new0 (SpliceUserData, 1); - data->callback = callback; - data->user_data = user_data; - data->source = g_object_ref (source); - - g_object_ref (stream); class->splice_async (stream, source, flags, io_priority, cancellable, - async_ready_splice_callback_wrapper, data); + async_ready_splice_callback_wrapper, task); } /** @@ -917,21 +1252,42 @@ g_output_stream_splice_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple; - GOutputStreamClass *class; + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE); - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); - g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1); + /* @result is always the GTask created by g_output_stream_splice_async(); + * we called class->splice_finish() from async_ready_splice_callback_wrapper. + */ + return g_task_propagate_int (G_TASK (result), error); +} - if (G_IS_SIMPLE_ASYNC_RESULT (result)) +static void +async_ready_flush_callback_wrapper (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GOutputStreamClass *class; + GTask *task = user_data; + gboolean flushed; + GError *error = NULL; + + g_output_stream_clear_pending (stream); + + if (g_async_result_legacy_propagate_error (res, &error)) + flushed = FALSE; + else { - simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return -1; + class = G_OUTPUT_STREAM_GET_CLASS (stream); + flushed = class->flush_finish (stream, res, &error); } - - class = G_OUTPUT_STREAM_GET_CLASS (stream); - return class->splice_finish (stream, result, error); + + if (flushed) + g_task_return_boolean (task, TRUE); + else + g_task_return_error (task, error); + g_object_unref (task); } /** @@ -958,38 +1314,33 @@ g_output_stream_flush_async (GOutputStream *stream, gpointer user_data) { GOutputStreamClass *class; - GSimpleAsyncResult *simple; + GTask *task; GError *error = NULL; g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_flush_async); + g_task_set_priority (task, io_priority); + if (!g_output_stream_set_pending (stream, &error)) { - g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream), - callback, - user_data, - error); + g_task_return_error (task, error); + g_object_unref (task); return; } - stream->priv->outstanding_callback = callback; - g_object_ref (stream); - class = G_OUTPUT_STREAM_GET_CLASS (stream); if (class->flush_async == NULL) { - simple = g_simple_async_result_new (G_OBJECT (stream), - async_ready_callback_wrapper, - user_data, - g_output_stream_flush_async); - g_simple_async_result_complete_in_idle (simple); - g_object_unref (simple); + g_task_return_boolean (task, TRUE); + g_object_unref (task); return; } class->flush_async (stream, io_priority, cancellable, - async_ready_callback_wrapper, user_data); + async_ready_flush_callback_wrapper, task); } /** @@ -1008,27 +1359,94 @@ g_output_stream_flush_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple; - GOutputStreamClass *klass; - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); - g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE); - if (G_IS_SIMPLE_ASYNC_RESULT (result)) + /* @result is always the GTask created by g_output_stream_flush_async(); + * we called class->flush_finish() from async_ready_flush_callback_wrapper. + */ + return g_task_propagate_boolean (G_TASK (result), error); +} + + +static void +async_ready_close_callback_wrapper (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GOutputStreamClass *class; + GTask *task = user_data; + GError *error = g_task_get_task_data (task); + + stream->priv->closing = FALSE; + stream->priv->closed = TRUE; + + if (!error && !g_async_result_legacy_propagate_error (res, &error)) { - simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return FALSE; + class = G_OUTPUT_STREAM_GET_CLASS (stream); - /* Special case default implementation */ - if (g_simple_async_result_get_source_tag (simple) == g_output_stream_flush_async) - return TRUE; + class->close_finish (stream, res, + error ? NULL : &error); } - klass = G_OUTPUT_STREAM_GET_CLASS (stream); - return klass->flush_finish (stream, result, error); + if (error != NULL) + g_task_return_error (task, error); + else + g_task_return_boolean (task, TRUE); + g_object_unref (task); } +static void +async_ready_close_flushed_callback_wrapper (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GOutputStreamClass *class; + GTask *task = user_data; + GError *error = NULL; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + if (!g_async_result_legacy_propagate_error (res, &error)) + { + class->flush_finish (stream, res, &error); + } + + /* propagate the possible error */ + if (error) + g_task_set_task_data (task, error, NULL); + + /* we still close, even if there was a flush error */ + class->close_async (stream, + g_task_get_priority (task), + g_task_get_cancellable (task), + async_ready_close_callback_wrapper, task); +} + +static void +real_close_async_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GTask *task = user_data; + GError *error = NULL; + gboolean ret; + + g_output_stream_clear_pending (stream); + + ret = g_output_stream_internal_close_finish (stream, res, &error); + + if (error != NULL) + g_task_return_error (task, error); + else + g_task_return_boolean (task, ret); + + g_object_unref (task); +} /** * g_output_stream_close_async: @@ -1056,45 +1474,52 @@ g_output_stream_close_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GOutputStreamClass *class; - GSimpleAsyncResult *simple; + GTask *task; GError *error = NULL; - CloseUserData *data; g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); - if (stream->priv->closed) + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_close_async); + g_task_set_priority (task, io_priority); + + if (!g_output_stream_set_pending (stream, &error)) { - simple = g_simple_async_result_new (G_OBJECT (stream), - callback, - user_data, - g_output_stream_close_async); - g_simple_async_result_complete_in_idle (simple); - g_object_unref (simple); + g_task_return_error (task, error); + g_object_unref (task); return; } - if (!g_output_stream_set_pending (stream, &error)) + g_output_stream_internal_close_async (stream, io_priority, cancellable, + real_close_async_cb, task); +} + +/* Must always be called inside + * g_output_stream_set_pending()/g_output_stream_clear_pending(). + */ +void +g_output_stream_internal_close_async (GOutputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GOutputStreamClass *class; + GTask *task; + + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_internal_close_async); + g_task_set_priority (task, io_priority); + + if (stream->priv->closed) { - g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream), - callback, - user_data, - error); + g_task_return_boolean (task, TRUE); + g_object_unref (task); return; } - + class = G_OUTPUT_STREAM_GET_CLASS (stream); stream->priv->closing = TRUE; - stream->priv->outstanding_callback = callback; - g_object_ref (stream); - - data = g_slice_new0 (CloseUserData); - - if (cancellable != NULL) - data->cancellable = g_object_ref (cancellable); - - data->io_priority = io_priority; - data->user_data = user_data; /* Call close_async directly if there is no need to flush, or if the flush can be done sync (in the output stream async close thread) */ @@ -1103,17 +1528,29 @@ g_output_stream_close_async (GOutputStream *stream, (class->flush == NULL || class->close_async == g_output_stream_real_close_async))) { class->close_async (stream, io_priority, cancellable, - async_ready_close_callback_wrapper, data); + async_ready_close_callback_wrapper, task); } else { /* First do an async flush, then do the async close in the callback wrapper (see async_ready_close_flushed_callback_wrapper) */ class->flush_async (stream, io_priority, cancellable, - async_ready_close_flushed_callback_wrapper, data); + async_ready_close_flushed_callback_wrapper, task); } } +static gboolean +g_output_stream_internal_close_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_internal_close_async), FALSE); + + return g_task_propagate_boolean (G_TASK (result), error); +} + /** * g_output_stream_close_finish: * @stream: a #GOutputStream. @@ -1130,25 +1567,14 @@ g_output_stream_close_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple; - GOutputStreamClass *class; - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); - g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE); - if (G_IS_SIMPLE_ASYNC_RESULT (result)) - { - simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return FALSE; - - /* Special case already closed */ - if (g_simple_async_result_get_source_tag (simple) == g_output_stream_close_async) - return TRUE; - } - - class = G_OUTPUT_STREAM_GET_CLASS (stream); - return class->close_finish (stream, result, error); + /* @result is always the GTask created by g_output_stream_close_async(); + * we called class->close_finish() from async_ready_close_callback_wrapper. + */ + return g_task_propagate_boolean (G_TASK (result), error); } /** @@ -1214,7 +1640,7 @@ g_output_stream_has_pending (GOutputStream *stream) * already set or @stream is closed, it will return %FALSE and set * @error. * - * Return value: %TRUE if pending was previously unset and is now set. + * Returns: %TRUE if pending was previously unset and is now set. **/ gboolean g_output_stream_set_pending (GOutputStream *stream, @@ -1257,6 +1683,28 @@ g_output_stream_clear_pending (GOutputStream *stream) stream->priv->pending = FALSE; } +/** + * g_output_stream_async_write_is_via_threads: + * @stream: a #GOutputStream. + * + * Checks if an ouput stream's write_async function uses threads. + * + * Returns: %TRUE if @stream's write_async function uses threads. + **/ +gboolean +g_output_stream_async_write_is_via_threads (GOutputStream *stream) +{ + GOutputStreamClass *class; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + return (class->write_async == g_output_stream_real_write_async && + !(G_IS_POLLABLE_OUTPUT_STREAM (stream) && + g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))); +} + /******************************************** * Default implementation of async ops * @@ -1269,20 +1717,77 @@ typedef struct { } WriteData; static void -write_async_thread (GSimpleAsyncResult *res, - GObject *object, - GCancellable *cancellable) +free_write_data (WriteData *op) { - WriteData *op; + g_slice_free (WriteData, op); +} + +static void +write_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GOutputStream *stream = source_object; + WriteData *op = task_data; GOutputStreamClass *class; GError *error = NULL; + gssize count_written; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + count_written = class->write_fn (stream, op->buffer, op->count_requested, + cancellable, &error); + if (count_written == -1) + g_task_return_error (task, error); + else + g_task_return_int (task, count_written); +} + +static void write_async_pollable (GPollableOutputStream *stream, + GTask *task); + +static gboolean +write_async_pollable_ready (GPollableOutputStream *stream, + gpointer user_data) +{ + GTask *task = user_data; - class = G_OUTPUT_STREAM_GET_CLASS (object); - op = g_simple_async_result_get_op_res_gpointer (res); - op->count_written = class->write_fn (G_OUTPUT_STREAM (object), op->buffer, op->count_requested, - cancellable, &error); - if (op->count_written == -1) - g_simple_async_result_take_error (res, error); + write_async_pollable (stream, task); + return FALSE; +} + +static void +write_async_pollable (GPollableOutputStream *stream, + GTask *task) +{ + GError *error = NULL; + WriteData *op = g_task_get_task_data (task); + gssize count_written; + + if (g_task_return_error_if_cancelled (task)) + return; + + count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)-> + write_nonblocking (stream, op->buffer, op->count_requested, &error); + + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) + { + GSource *source; + + g_error_free (error); + + source = g_pollable_output_stream_create_source (stream, + g_task_get_cancellable (task)); + g_task_attach_source (task, source, + (GSourceFunc) write_async_pollable_ready); + g_source_unref (source); + return; + } + + if (count_written == -1) + g_task_return_error (task, error); + else + g_task_return_int (task, count_written); } static void @@ -1294,17 +1799,21 @@ g_output_stream_real_write_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *res; + GTask *task; WriteData *op; - op = g_new0 (WriteData, 1); - res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async); - g_simple_async_result_set_op_res_gpointer (res, op, g_free); + op = g_slice_new0 (WriteData); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_check_cancellable (task, FALSE); + g_task_set_task_data (task, op, (GDestroyNotify) free_write_data); op->buffer = buffer; op->count_requested = count; - - g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable); - g_object_unref (res); + + if (!g_output_stream_async_write_is_via_threads (stream)) + write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task); + else + g_task_run_in_thread (task, write_async_thread); + g_object_unref (task); } static gssize @@ -1312,41 +1821,207 @@ g_output_stream_real_write_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); - WriteData *op; + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_write_async); - op = g_simple_async_result_get_op_res_gpointer (simple); - return op->count_written; + return g_task_propagate_int (G_TASK (result), error); } typedef struct { GInputStream *source; GOutputStreamSpliceFlags flags; - gssize bytes_copied; + gssize n_read; + gssize n_written; + gsize bytes_copied; + GError *error; + guint8 *buffer; } SpliceData; static void -splice_async_thread (GSimpleAsyncResult *result, - GObject *object, - GCancellable *cancellable) +free_splice_data (SpliceData *op) { - SpliceData *op; + g_clear_pointer (&op->buffer, g_free); + g_object_unref (op->source); + g_clear_error (&op->error); + g_free (op); +} + +static void +real_splice_async_complete_cb (GTask *task) +{ + SpliceData *op = g_task_get_task_data (task); + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE && + !g_input_stream_is_closed (op->source)) + return; + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET && + !g_output_stream_is_closed (g_task_get_source_object (task))) + return; + + if (op->error != NULL) + { + g_task_return_error (task, op->error); + op->error = NULL; + } + else + { + g_task_return_int (task, op->bytes_copied); + } + + g_object_unref (task); +} + +static void +real_splice_async_close_input_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GTask *task = user_data; + + g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL); + + real_splice_async_complete_cb (task); +} + +static void +real_splice_async_close_output_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GTask *task = G_TASK (user_data); + SpliceData *op = g_task_get_task_data (task); + GError **error = (op->error == NULL) ? &op->error : NULL; + + g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error); + + real_splice_async_complete_cb (task); +} + +static void +real_splice_async_complete (GTask *task) +{ + SpliceData *op = g_task_get_task_data (task); + gboolean done = TRUE; + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE) + { + done = FALSE; + g_input_stream_close_async (op->source, g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_close_input_cb, task); + } + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET) + { + done = FALSE; + g_output_stream_internal_close_async (g_task_get_source_object (task), + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_close_output_cb, + task); + } + + if (done) + real_splice_async_complete_cb (task); +} + +static void real_splice_async_read_cb (GObject *source, + GAsyncResult *res, + gpointer user_data); + +static void +real_splice_async_write_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStreamClass *class; + GTask *task = G_TASK (user_data); + SpliceData *op = g_task_get_task_data (task); + gssize ret; + + class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task)); + + ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error); + + if (ret == -1) + { + real_splice_async_complete (task); + return; + } + + op->n_written += ret; + op->bytes_copied += ret; + if (op->bytes_copied > G_MAXSSIZE) + op->bytes_copied = G_MAXSSIZE; + + if (op->n_written < op->n_read) + { + class->write_async (g_task_get_source_object (task), + op->buffer + op->n_written, + op->n_read - op->n_written, + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_write_cb, task); + return; + } + + g_input_stream_read_async (op->source, op->buffer, 8192, + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_read_cb, task); +} + +static void +real_splice_async_read_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStreamClass *class; + GTask *task = G_TASK (user_data); + SpliceData *op = g_task_get_task_data (task); + gssize ret; + + class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task)); + + ret = g_input_stream_read_finish (op->source, res, &op->error); + if (ret == -1 || ret == 0) + { + real_splice_async_complete (task); + return; + } + + op->n_read = ret; + op->n_written = 0; + + class->write_async (g_task_get_source_object (task), op->buffer, + op->n_read, g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_write_cb, task); +} + +static void +splice_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GOutputStream *stream = source_object; + SpliceData *op = task_data; GOutputStreamClass *class; GError *error = NULL; - GOutputStream *stream; + gssize bytes_copied; - stream = G_OUTPUT_STREAM (object); - class = G_OUTPUT_STREAM_GET_CLASS (object); - op = g_simple_async_result_get_op_res_gpointer (result); + class = G_OUTPUT_STREAM_GET_CLASS (stream); - op->bytes_copied = class->splice (stream, - op->source, - op->flags, - cancellable, - &error); - if (op->bytes_copied == -1) - g_simple_async_result_take_error (result, error); + bytes_copied = class->splice (stream, + op->source, + op->flags, + cancellable, + &error); + if (bytes_copied == -1) + g_task_return_error (task, error); + else + g_task_return_int (task, bytes_copied); } static void @@ -1358,20 +2033,29 @@ g_output_stream_real_splice_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *res; + GTask *task; SpliceData *op; op = g_new0 (SpliceData, 1); - res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_splice_async); - g_simple_async_result_set_op_res_gpointer (res, op, g_free); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data); op->flags = flags; - op->source = source; + op->source = g_object_ref (source); - /* TODO: In the case where both source and destintion have - non-threadbased async calls we can use a true async copy here */ - - g_simple_async_result_run_in_thread (res, splice_async_thread, io_priority, cancellable); - g_object_unref (res); + if (g_input_stream_async_read_is_via_threads (source) && + g_output_stream_async_write_is_via_threads (stream)) + { + g_task_run_in_thread (task, splice_async_thread); + g_object_unref (task); + } + else + { + op->buffer = g_malloc (8192); + g_input_stream_read_async (op->source, op->buffer, 8192, + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_read_cb, task); + } } static gssize @@ -1379,31 +2063,32 @@ g_output_stream_real_splice_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); - SpliceData *op; + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_splice_async); - op = g_simple_async_result_get_op_res_gpointer (simple); - return op->bytes_copied; + return g_task_propagate_int (G_TASK (result), error); } static void -flush_async_thread (GSimpleAsyncResult *res, - GObject *object, - GCancellable *cancellable) +flush_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) { + GOutputStream *stream = source_object; GOutputStreamClass *class; gboolean result; GError *error = NULL; - class = G_OUTPUT_STREAM_GET_CLASS (object); + class = G_OUTPUT_STREAM_GET_CLASS (stream); result = TRUE; if (class->flush) - result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error); + result = class->flush (stream, cancellable, &error); - if (!result) - g_simple_async_result_take_error (res, error); + if (result) + g_task_return_boolean (task, TRUE); + else + g_task_return_error (task, error); } static void @@ -1413,12 +2098,12 @@ g_output_stream_real_flush_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *res; + GTask *task; - res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async); - - g_simple_async_result_run_in_thread (res, flush_async_thread, io_priority, cancellable); - g_object_unref (res); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_priority (task, io_priority); + g_task_run_in_thread (task, flush_async_thread); + g_object_unref (task); } static gboolean @@ -1426,45 +2111,52 @@ g_output_stream_real_flush_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - return TRUE; + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + + return g_task_propagate_boolean (G_TASK (result), error); } static void -close_async_thread (GSimpleAsyncResult *res, - GObject *object, - GCancellable *cancellable) +close_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) { + GOutputStream *stream = source_object; GOutputStreamClass *class; GError *error = NULL; gboolean result = TRUE; - class = G_OUTPUT_STREAM_GET_CLASS (object); + class = G_OUTPUT_STREAM_GET_CLASS (stream); /* Do a flush here if there is a flush function, and we did not have to do - an async flush before (see g_output_stream_close_async) */ + * an async flush before (see g_output_stream_close_async) + */ if (class->flush != NULL && (class->flush_async == NULL || class->flush_async == g_output_stream_real_flush_async)) { - result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error); + result = class->flush (stream, cancellable, &error); } /* Auto handling of cancelation disabled, and ignore cancellation, since we want to close things anyway, although possibly in a quick-n-dirty way. At least we never want to leak open handles */ - + if (class->close_fn) { /* Make sure to close, even if the flush failed (see sync close) */ if (!result) - class->close_fn (G_OUTPUT_STREAM (object), cancellable, NULL); + class->close_fn (stream, cancellable, NULL); else - result = class->close_fn (G_OUTPUT_STREAM (object), cancellable, &error); - - if (!result) - g_simple_async_result_take_error (res, error); + result = class->close_fn (stream, cancellable, &error); } + + if (result) + g_task_return_boolean (task, TRUE); + else + g_task_return_error (task, error); } static void @@ -1474,14 +2166,12 @@ g_output_stream_real_close_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *res; - - res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_close_async); + GTask *task; - g_simple_async_result_set_handle_cancellation (res, FALSE); - - g_simple_async_result_run_in_thread (res, close_async_thread, io_priority, cancellable); - g_object_unref (res); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_priority (task, io_priority); + g_task_run_in_thread (task, close_async_thread); + g_object_unref (task); } static gboolean @@ -1489,7 +2179,7 @@ g_output_stream_real_close_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_close_async); - return TRUE; + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + + return g_task_propagate_boolean (G_TASK (result), error); }