X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gio%2Fgoutputstream.c;h=2bec89e7e591ab6079f7b081727ae83c3192205a;hb=33b9935efc82f8cc4747dfea2743129dfc418d19;hp=38d5419c4f52d5dacc488d094c78df8128c64fa0;hpb=afe3324fcac8ea2a6b6007c938d7974aa923c0d3;p=platform%2Fupstream%2Fglib.git diff --git a/gio/goutputstream.c b/gio/goutputstream.c index 38d5419..2bec89e 100644 --- a/gio/goutputstream.c +++ b/gio/goutputstream.c @@ -13,30 +13,29 @@ * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General - * Public License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place, Suite 330, - * Boston, MA 02111-1307, USA. + * Public License along with this library; if not, see . * * Author: Alexander Larsson */ #include "config.h" +#include #include "goutputstream.h" #include "gcancellable.h" #include "gasyncresult.h" -#include "gsimpleasyncresult.h" +#include "gtask.h" #include "ginputstream.h" #include "gioerror.h" +#include "gioprivate.h" #include "glibintl.h" - -#include "gioalias.h" +#include "gpollableoutputstream.h" /** * SECTION:goutputstream * @short_description: Base class for implementing streaming output * @include: gio/gio.h * - * GOutputStream has functions to write to a stream (g_output_stream_write()), + * #GOutputStream has functions to write to a stream (g_output_stream_write()), * to close a stream (g_output_stream_close()) and to flush pending writes * (g_output_stream_flush()). * @@ -46,8 +45,6 @@ * All of these functions have async variants too. **/ -G_DEFINE_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT); - struct _GOutputStreamPrivate { guint closed : 1; guint pending : 1; @@ -55,6 +52,8 @@ struct _GOutputStreamPrivate { GAsyncReadyCallback outstanding_callback; }; +G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT) + static gssize g_output_stream_real_splice (GOutputStream *stream, GInputStream *source, GOutputStreamSpliceFlags flags, @@ -96,12 +95,17 @@ static void g_output_stream_real_close_async (GOutputStream *s static gboolean g_output_stream_real_close_finish (GOutputStream *stream, GAsyncResult *result, GError **error); - -static void -g_output_stream_finalize (GObject *object) -{ - G_OBJECT_CLASS (g_output_stream_parent_class)->finalize (object); -} +static gboolean g_output_stream_internal_close (GOutputStream *stream, + GCancellable *cancellable, + GError **error); +static void g_output_stream_internal_close_async (GOutputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gboolean g_output_stream_internal_close_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error); static void g_output_stream_dispose (GObject *object) @@ -120,10 +124,7 @@ static void g_output_stream_class_init (GOutputStreamClass *klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); - - g_type_class_add_private (klass, sizeof (GOutputStreamPrivate)); - - gobject_class->finalize = g_output_stream_finalize; + gobject_class->dispose = g_output_stream_dispose; klass->splice = g_output_stream_real_splice; @@ -141,40 +142,41 @@ g_output_stream_class_init (GOutputStreamClass *klass) static void g_output_stream_init (GOutputStream *stream) { - stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, - G_TYPE_OUTPUT_STREAM, - GOutputStreamPrivate); + stream->priv = g_output_stream_get_instance_private (stream); } /** * g_output_stream_write: * @stream: a #GOutputStream. - * @buffer: the buffer containing the data to write. + * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. * @count: the number of bytes to write - * @cancellable: optional cancellable object - * @error: location to store the error occuring, or %NULL to ignore + * @cancellable: (allow-none): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore * * Tries to write @count bytes from @buffer into the stream. Will block * during the operation. * - * If count is zero returns zero and does nothing. A value of @count + * If count is 0, returns 0 and does nothing. A value of @count * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error. * * On success, the number of bytes written to the stream is returned. * It is not an error if this is not the same as the requested size, as it - * can happen e.g. on a partial i/o error, or if there is not enough - * storage in the stream. All writes either block until at least one byte - * is written, so zero is never returned (unless @count is zero). + * can happen e.g. on a partial I/O error, or if there is not enough + * storage in the stream. All writes block until at least one byte + * is written or an error occurs; 0 is never returned (unless + * @count is 0). * - * If @cancellable is not NULL, then the operation can be cancelled by + * If @cancellable is not %NULL, then the operation can be cancelled by * triggering the cancellable object from another thread. If the operation - * was cancelled, the error G_IO_ERROR_CANCELLED will be returned. If an + * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an * operation was partially finished when the operation was cancelled the * partial result will be returned, without an error. * * On error -1 is returned and @error is set accordingly. * - * Return value: Number of bytes written, or -1 on error + * Virtual: write_fn + * + * Returns: Number of bytes written, or -1 on error **/ gssize g_output_stream_write (GOutputStream *stream, @@ -227,12 +229,12 @@ g_output_stream_write (GOutputStream *stream, /** * g_output_stream_write_all: * @stream: a #GOutputStream. - * @buffer: the buffer containing the data to write. + * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. * @count: the number of bytes to write - * @bytes_written: location to store the number of bytes that was + * @bytes_written: (out): location to store the number of bytes that was * written to the stream - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @error: location to store the error occuring, or %NULL to ignore + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @error: location to store the error occurring, or %NULL to ignore * * Tries to write @count bytes from @buffer into the stream. Will block * during the operation. @@ -243,11 +245,18 @@ g_output_stream_write (GOutputStream *stream, * On a successful write of @count bytes, %TRUE is returned, and @bytes_written * is set to @count. * - * If there is an error during the operation FALSE is returned and @error - * is set to indicate the error status, @bytes_written is updated to contain - * the number of bytes written into the stream before the error occurred. + * If there is an error during the operation %FALSE is returned and @error + * is set to indicate the error status. * - * Return value: %TRUE on success, %FALSE if there was an error + * As a special exception to the normal conventions for functions that + * use #GError, if this function returns %FALSE (and sets @error) then + * @bytes_written will be set to the number of bytes that were + * successfully written before the error was encountered. This + * functionality is only available from C. If you need it from another + * language then you must write your own loop around + * g_output_stream_write(). + * + * Returns: %TRUE on success, %FALSE if there was an error **/ gboolean g_output_stream_write_all (GOutputStream *stream, @@ -288,13 +297,150 @@ g_output_stream_write_all (GOutputStream *stream, } /** + * g_output_stream_printf: + * @stream: a #GOutputStream. + * @bytes_written: (out): location to store the number of bytes that was + * written to the stream + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @error: location to store the error occurring, or %NULL to ignore + * @format: the format string. See the printf() documentation + * @...: the parameters to insert into the format string + * + * This is a utility function around g_output_stream_write_all(). It + * uses g_strdup_vprintf() to turn @format and @... into a string that + * is then written to @stream. + * + * See the documentation of g_output_stream_write_all() about the + * behavior of the actual write operation. + * + * Note that partial writes cannot be properly checked with this + * function due to the variable length of the written string, if you + * need precise control over partial write failures, you need to + * create you own printf()-like wrapper around g_output_stream_write() + * or g_output_stream_write_all(). + * + * Since: 2.40 + * + * Returns: %TRUE on success, %FALSE if there was an error + **/ +gboolean +g_output_stream_printf (GOutputStream *stream, + gsize *bytes_written, + GCancellable *cancellable, + GError **error, + const gchar *format, + ...) +{ + va_list args; + gboolean success; + + va_start (args, format); + success = g_output_stream_vprintf (stream, bytes_written, cancellable, + error, format, args); + va_end (args); + + return success; +} + +/** + * g_output_stream_vprintf: + * @stream: a #GOutputStream. + * @bytes_written: (out): location to store the number of bytes that was + * written to the stream + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @error: location to store the error occurring, or %NULL to ignore + * @format: the format string. See the printf() documentation + * @args: the parameters to insert into the format string + * + * This is a utility function around g_output_stream_write_all(). It + * uses g_strdup_vprintf() to turn @format and @args into a string that + * is then written to @stream. + * + * See the documentation of g_output_stream_write_all() about the + * behavior of the actual write operation. + * + * Note that partial writes cannot be properly checked with this + * function due to the variable length of the written string, if you + * need precise control over partial write failures, you need to + * create you own printf()-like wrapper around g_output_stream_write() + * or g_output_stream_write_all(). + * + * Since: 2.40 + * + * Returns: %TRUE on success, %FALSE if there was an error + **/ +gboolean +g_output_stream_vprintf (GOutputStream *stream, + gsize *bytes_written, + GCancellable *cancellable, + GError **error, + const gchar *format, + va_list args) +{ + gchar *text; + gboolean success; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (stream), FALSE); + g_return_val_if_fail (error == NULL || *error == NULL, FALSE); + g_return_val_if_fail (format != NULL, FALSE); + + text = g_strdup_vprintf (format, args); + success = g_output_stream_write_all (stream, + text, strlen (text), + bytes_written, cancellable, error); + g_free (text); + + return success; +} + +/** + * g_output_stream_write_bytes: + * @stream: a #GOutputStream. + * @bytes: the #GBytes to write + * @cancellable: (allow-none): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore + * + * A wrapper function for g_output_stream_write() which takes a + * #GBytes as input. This can be more convenient for use by language + * bindings or in other cases where the refcounted nature of #GBytes + * is helpful over a bare pointer interface. + * + * However, note that this function may still perform partial writes, + * just like g_output_stream_write(). If that occurs, to continue + * writing, you will need to create a new #GBytes containing just the + * remaining bytes, using g_bytes_new_from_bytes(). Passing the same + * #GBytes instance multiple times potentially can result in duplicated + * data in the output stream. + * + * Returns: Number of bytes written, or -1 on error + **/ +gssize +g_output_stream_write_bytes (GOutputStream *stream, + GBytes *bytes, + GCancellable *cancellable, + GError **error) +{ + gsize size; + gconstpointer data; + + data = g_bytes_get_data (bytes, &size); + + return g_output_stream_write (stream, + data, size, + cancellable, + error); +} + +/** * g_output_stream_flush: * @stream: a #GOutputStream. - * @cancellable: optional cancellable object - * @error: location to store the error occuring, or %NULL to ignore + * @cancellable: (allow-none): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore * - * Flushed any outstanding buffers in the stream. Will block during - * the operation. Closing the stream will implicitly cause a flush. + * Forces a write of all user-space buffered data for the given + * @stream. Will block during the operation. Closing the stream will + * implicitly cause a flush. * * This function is optional for inherited classes. * @@ -302,7 +448,7 @@ g_output_stream_write_all (GOutputStream *stream, * triggering the cancellable object from another thread. If the operation * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. * - * Return value: %TRUE on success, %FALSE on error + * Returns: %TRUE on success, %FALSE on error **/ gboolean g_output_stream_flush (GOutputStream *stream, @@ -341,14 +487,17 @@ g_output_stream_flush (GOutputStream *stream, * @stream: a #GOutputStream. * @source: a #GInputStream. * @flags: a set of #GOutputStreamSpliceFlags. - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @error: a #GError location to store the error occuring, or %NULL to + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Splices an input stream into an output stream. * * Returns: a #gssize containing the size of the data spliced, or - * -1 if an error occurred. + * -1 if an error occurred. Note that if the number of bytes + * spliced is greater than %G_MAXSSIZE, then that will be + * returned, and there is no way to determine the actual number + * of bytes spliced. **/ gssize g_output_stream_splice (GOutputStream *stream, @@ -397,7 +546,7 @@ g_output_stream_real_splice (GOutputStream *stream, { GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream); gssize n_read, n_written; - gssize bytes_copied; + gsize bytes_copied; char buffer[8192], *p; gboolean res; @@ -437,6 +586,9 @@ g_output_stream_real_splice (GOutputStream *stream, n_read -= n_written; bytes_copied += n_written; } + + if (bytes_copied > G_MAXSSIZE) + bytes_copied = G_MAXSSIZE; } while (res); @@ -453,9 +605,7 @@ g_output_stream_real_splice (GOutputStream *stream, if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET) { /* But write errors on close are bad! */ - if (class->close_fn && - !class->close_fn (stream, cancellable, error)) - res = FALSE; + res = g_output_stream_internal_close (stream, cancellable, error); } if (res) @@ -464,12 +614,60 @@ g_output_stream_real_splice (GOutputStream *stream, return -1; } +/* Must always be called inside + * g_output_stream_set_pending()/g_output_stream_clear_pending(). */ +static gboolean +g_output_stream_internal_close (GOutputStream *stream, + GCancellable *cancellable, + GError **error) +{ + GOutputStreamClass *class; + gboolean res; + + if (stream->priv->closed) + return TRUE; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + stream->priv->closing = TRUE; + + if (cancellable) + g_cancellable_push_current (cancellable); + + if (class->flush) + res = class->flush (stream, cancellable, error); + else + res = TRUE; + + if (!res) + { + /* flushing caused the error that we want to return, + * but we still want to close the underlying stream if possible + */ + if (class->close_fn) + class->close_fn (stream, cancellable, NULL); + } + else + { + res = TRUE; + if (class->close_fn) + res = class->close_fn (stream, cancellable, error); + } + + if (cancellable) + g_cancellable_pop_current (cancellable); + + stream->priv->closing = FALSE; + stream->priv->closed = TRUE; + + return res; +} /** * g_output_stream_close: * @stream: A #GOutputStream. - * @cancellable: optional cancellable object - * @error: location to store the error occuring, or %NULL to ignore + * @cancellable: (allow-none): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore * * Closes the stream, releasing resources related to it. * @@ -493,7 +691,7 @@ g_output_stream_real_splice (GOutputStream *stream, * is important to check and report the error to the user, otherwise * there might be a loss of data as all data might not be written. * - * If @cancellable is not NULL, then the operation can be cancelled by + * If @cancellable is not %NULL, then the operation can be cancelled by * triggering the cancellable object from another thread. If the operation * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. * Cancelling a close will still leave the stream closed, but there some streams @@ -501,98 +699,67 @@ g_output_stream_real_splice (GOutputStream *stream, * cancellation (as with any error) there is no guarantee that all written * data will reach the target. * - * Return value: %TRUE on success, %FALSE on failure + * Returns: %TRUE on success, %FALSE on failure **/ gboolean g_output_stream_close (GOutputStream *stream, GCancellable *cancellable, GError **error) { - GOutputStreamClass *class; gboolean res; g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); - class = G_OUTPUT_STREAM_GET_CLASS (stream); - if (stream->priv->closed) return TRUE; if (!g_output_stream_set_pending (stream, error)) return FALSE; - stream->priv->closing = TRUE; - - if (cancellable) - g_cancellable_push_current (cancellable); - - if (class->flush) - res = class->flush (stream, cancellable, error); - else - res = TRUE; - - if (!res) - { - /* flushing caused the error that we want to return, - * but we still want to close the underlying stream if possible - */ - if (class->close_fn) - class->close_fn (stream, cancellable, NULL); - } - else - { - res = TRUE; - if (class->close_fn) - res = class->close_fn (stream, cancellable, error); - } - - if (cancellable) - g_cancellable_pop_current (cancellable); + res = g_output_stream_internal_close (stream, cancellable, error); - stream->priv->closing = FALSE; - stream->priv->closed = TRUE; g_output_stream_clear_pending (stream); return res; } static void -async_ready_callback_wrapper (GObject *source_object, - GAsyncResult *res, - gpointer user_data) -{ - GOutputStream *stream = G_OUTPUT_STREAM (source_object); - - g_output_stream_clear_pending (stream); - if (stream->priv->outstanding_callback) - (*stream->priv->outstanding_callback) (source_object, res, user_data); - g_object_unref (stream); -} - -static void -async_ready_close_callback_wrapper (GObject *source_object, +async_ready_write_callback_wrapper (GObject *source_object, GAsyncResult *res, gpointer user_data) { GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GOutputStreamClass *class; + GTask *task = user_data; + gssize nwrote; + GError *error = NULL; - stream->priv->closing = FALSE; - stream->priv->closed = TRUE; g_output_stream_clear_pending (stream); - if (stream->priv->outstanding_callback) - (*stream->priv->outstanding_callback) (source_object, res, user_data); - g_object_unref (stream); + + if (g_async_result_legacy_propagate_error (res, &error)) + nwrote = -1; + else + { + class = G_OUTPUT_STREAM_GET_CLASS (stream); + nwrote = class->write_finish (stream, res, &error); + } + + if (nwrote >= 0) + g_task_return_int (task, nwrote); + else + g_task_return_error (task, error); + g_object_unref (task); } /** * g_output_stream_write_async: * @stream: A #GOutputStream. - * @buffer: the buffer containing the data to write. + * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. * @count: the number of bytes to write * @io_priority: the io priority of the request. - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @callback: callback to call when the request is satisfied - * @user_data: the data to pass to callback function + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function * * Request an asynchronous write of @count bytes from @buffer into * the stream. When the operation is finished @callback will be called. @@ -610,6 +777,10 @@ async_ready_close_callback_wrapper (GObject *source_object, * requested size, as it can happen e.g. on a partial I/O error, * but generally we try to write as many bytes as requested. * + * You are guaranteed that this method will never fail with + * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the + * method will just wait until this changes. + * * Any outstanding I/O request with higher priority (lower numerical * value) will be executed before an outstanding request with lower * priority. Default priority is %G_PRIORITY_DEFAULT. @@ -620,7 +791,12 @@ async_ready_close_callback_wrapper (GObject *source_object, * * For the synchronous, blocking version of this function, see * g_output_stream_write(). - **/ + * + * Note that no copy of @buffer will be made, so it must stay valid + * until @callback is called. See g_output_stream_write_bytes_async() + * for a #GBytes version that will automatically hold a reference to + * the contents (without copying) for the duration of the call. + */ void g_output_stream_write_async (GOutputStream *stream, const void *buffer, @@ -631,57 +807,50 @@ g_output_stream_write_async (GOutputStream *stream, gpointer user_data) { GOutputStreamClass *class; - GSimpleAsyncResult *simple; GError *error = NULL; + GTask *task; g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); g_return_if_fail (buffer != NULL); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_write_async); + g_task_set_priority (task, io_priority); + if (count == 0) { - simple = g_simple_async_result_new (G_OBJECT (stream), - callback, - user_data, - g_output_stream_write_async); - g_simple_async_result_complete_in_idle (simple); - g_object_unref (simple); + g_task_return_int (task, 0); + g_object_unref (task); return; } if (((gssize) count) < 0) { - g_simple_async_report_error_in_idle (G_OBJECT (stream), - callback, - user_data, - G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, - _("Too large count value passed to %s"), - G_STRFUNC); + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, + _("Too large count value passed to %s"), + G_STRFUNC); + g_object_unref (task); return; } if (!g_output_stream_set_pending (stream, &error)) { - g_simple_async_report_gerror_in_idle (G_OBJECT (stream), - callback, - user_data, - error); - g_error_free (error); + g_task_return_error (task, error); + g_object_unref (task); return; } class = G_OUTPUT_STREAM_GET_CLASS (stream); - stream->priv->outstanding_callback = callback; - g_object_ref (stream); class->write_async (stream, buffer, count, io_priority, cancellable, - async_ready_callback_wrapper, user_data); + async_ready_write_callback_wrapper, task); } /** * g_output_stream_write_finish: * @stream: a #GOutputStream. * @result: a #GAsyncResult. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Finishes a stream write operation. @@ -693,159 +862,444 @@ g_output_stream_write_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple; - GOutputStreamClass *class; - - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); - g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1); - - if (G_IS_SIMPLE_ASYNC_RESULT (result)) - { - simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return -1; + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE); - /* Special case writes of 0 bytes */ - if (g_simple_async_result_get_source_tag (simple) == g_output_stream_write_async) - return 0; - } - - class = G_OUTPUT_STREAM_GET_CLASS (stream); - return class->write_finish (stream, result, error); + /* @result is always the GTask created by g_output_stream_write_async(); + * we called class->write_finish() from async_ready_write_callback_wrapper. + */ + return g_task_propagate_int (G_TASK (result), error); } -typedef struct { - GInputStream *source; - gpointer user_data; - GAsyncReadyCallback callback; -} SpliceUserData; +typedef struct +{ + const guint8 *buffer; + gsize to_write; + gsize bytes_written; +} AsyncWriteAll; static void -async_ready_splice_callback_wrapper (GObject *source_object, - GAsyncResult *res, - gpointer _data) +free_async_write_all (gpointer data) { - GOutputStream *stream = G_OUTPUT_STREAM (source_object); - SpliceUserData *data = _data; - - g_output_stream_clear_pending (stream); - - if (data->callback) - (*data->callback) (source_object, res, data->user_data); - - g_object_unref (stream); - g_object_unref (data->source); - g_free (data); + g_slice_free (AsyncWriteAll, data); } -/** - * g_output_stream_splice_async: - * @stream: a #GOutputStream. - * @source: a #GInputStream. - * @flags: a set of #GOutputStreamSpliceFlags. - * @io_priority: the io priority of the request. - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @callback: a #GAsyncReadyCallback. - * @user_data: user data passed to @callback. - * - * Splices a stream asynchronously. - * When the operation is finished @callback will be called. - * You can then call g_output_stream_splice_finish() to get the - * result of the operation. - * - * For the synchronous, blocking version of this function, see - * g_output_stream_splice(). - **/ -void -g_output_stream_splice_async (GOutputStream *stream, - GInputStream *source, - GOutputStreamSpliceFlags flags, - int io_priority, - GCancellable *cancellable, - GAsyncReadyCallback callback, - gpointer user_data) +static void +write_all_callback (GObject *stream, + GAsyncResult *result, + gpointer user_data) { - GOutputStreamClass *class; - SpliceUserData *data; - GError *error = NULL; - - g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); - g_return_if_fail (G_IS_INPUT_STREAM (source)); + GTask *task = user_data; + AsyncWriteAll *data = g_task_get_task_data (task); - if (g_input_stream_is_closed (source)) + if (result) { - g_simple_async_report_error_in_idle (G_OBJECT (stream), - callback, - user_data, - G_IO_ERROR, G_IO_ERROR_CLOSED, - _("Source stream is already closed")); - return; + GError *error = NULL; + gssize nwritten; + + nwritten = g_output_stream_write_finish (G_OUTPUT_STREAM (stream), result, &error); + + if (nwritten == -1) + { + g_task_return_error (task, error); + g_object_unref (task); + return; + } + + g_assert_cmpint (nwritten, <=, data->to_write); + g_warn_if_fail (nwritten > 0); + + data->to_write -= nwritten; + data->bytes_written += nwritten; } - - if (!g_output_stream_set_pending (stream, &error)) + + if (data->to_write == 0) { - g_simple_async_report_gerror_in_idle (G_OBJECT (stream), - callback, - user_data, - error); - g_error_free (error); - return; + g_task_return_boolean (task, TRUE); + g_object_unref (task); } - class = G_OUTPUT_STREAM_GET_CLASS (stream); + else + g_output_stream_write_async (G_OUTPUT_STREAM (stream), + data->buffer + data->bytes_written, + data->to_write, + g_task_get_priority (task), + g_task_get_cancellable (task), + write_all_callback, task); +} - data = g_new0 (SpliceUserData, 1); - data->callback = callback; - data->user_data = user_data; - data->source = g_object_ref (source); - - g_object_ref (stream); - class->splice_async (stream, source, flags, io_priority, cancellable, - async_ready_splice_callback_wrapper, data); +static void +write_all_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GOutputStream *stream = source_object; + AsyncWriteAll *data = task_data; + GError *error = NULL; + + if (g_output_stream_write_all (stream, data->buffer, data->to_write, &data->bytes_written, + g_task_get_cancellable (task), &error)) + g_task_return_boolean (task, TRUE); + else + g_task_return_error (task, error); } /** - * g_output_stream_splice_finish: - * @stream: a #GOutputStream. - * @result: a #GAsyncResult. - * @error: a #GError location to store the error occuring, or %NULL to - * ignore. + * g_output_stream_write_all_async: + * @stream: A #GOutputStream + * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write + * @count: the number of bytes to write + * @io_priority: the io priority of the request + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function * - * Finishes an asynchronous stream splice operation. - * - * Returns: a #gssize of the number of bytes spliced. - **/ -gssize -g_output_stream_splice_finish (GOutputStream *stream, - GAsyncResult *result, - GError **error) + * Request an asynchronous write of @count bytes from @buffer into + * the stream. When the operation is finished @callback will be called. + * You can then call g_output_stream_write_all_finish() to get the result of the + * operation. + * + * This is the asynchronous version of g_output_stream_write_all(). + * + * Call g_output_stream_write_all_finish() to collect the result. + * + * Any outstanding I/O request with higher priority (lower numerical + * value) will be executed before an outstanding request with lower + * priority. Default priority is %G_PRIORITY_DEFAULT. + * + * Note that no copy of @buffer will be made, so it must stay valid + * until @callback is called. + * + * Since: 2.44 + */ +void +g_output_stream_write_all_async (GOutputStream *stream, + const void *buffer, + gsize count, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) { - GSimpleAsyncResult *simple; - GOutputStreamClass *class; + AsyncWriteAll *data; + GTask *task; - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); - g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1); + g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); + g_return_if_fail (buffer != NULL || count == 0); + + task = g_task_new (stream, cancellable, callback, user_data); + data = g_slice_new0 (AsyncWriteAll); + data->buffer = buffer; + data->to_write = count; - if (G_IS_SIMPLE_ASYNC_RESULT (result)) + g_task_set_task_data (task, data, free_async_write_all); + g_task_set_priority (task, io_priority); + + /* If async writes are going to be handled via the threadpool anyway + * then we may as well do it with a single dispatch instead of + * bouncing in and out. + */ + if (g_output_stream_async_write_is_via_threads (stream)) { - simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return -1; + g_task_run_in_thread (task, write_all_async_thread); + g_object_unref (task); } - - class = G_OUTPUT_STREAM_GET_CLASS (stream); - return class->splice_finish (stream, result, error); + else + write_all_callback (G_OBJECT (stream), NULL, task); +} + +/** + * g_output_stream_write_all_finish: + * @stream: a #GOutputStream + * @result: a #GAsyncResult + * @bytes_written: (out): location to store the number of bytes that was written to the stream + * @error: a #GError location to store the error occurring, or %NULL to ignore. + * + * Finishes an asynchronous stream write operation started with + * g_output_stream_write_all_async(). + * + * As a special exception to the normal conventions for functions that + * use #GError, if this function returns %FALSE (and sets @error) then + * @bytes_written will be set to the number of bytes that were + * successfully written before the error was encountered. This + * functionality is only available from C. If you need it from another + * language then you must write your own loop around + * g_output_stream_write_async(). + * + * Returns: %TRUE on success, %FALSE if there was an error + * + * Since: 2.44 + **/ +gboolean +g_output_stream_write_all_finish (GOutputStream *stream, + GAsyncResult *result, + gsize *bytes_written, + GError **error) +{ + GTask *task; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + + task = G_TASK (result); + + if (bytes_written) + { + AsyncWriteAll *data = (AsyncWriteAll *)g_task_get_task_data (task); + + *bytes_written = data->bytes_written; + } + + return g_task_propagate_boolean (task, error); +} + +static void +write_bytes_callback (GObject *stream, + GAsyncResult *result, + gpointer user_data) +{ + GTask *task = user_data; + GError *error = NULL; + gssize nwrote; + + nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream), + result, &error); + if (nwrote == -1) + g_task_return_error (task, error); + else + g_task_return_int (task, nwrote); + g_object_unref (task); +} + +/** + * g_output_stream_write_bytes_async: + * @stream: A #GOutputStream. + * @bytes: The bytes to write + * @io_priority: the io priority of the request. + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function + * + * This function is similar to g_output_stream_write_async(), but + * takes a #GBytes as input. Due to the refcounted nature of #GBytes, + * this allows the stream to avoid taking a copy of the data. + * + * However, note that this function may still perform partial writes, + * just like g_output_stream_write_async(). If that occurs, to continue + * writing, you will need to create a new #GBytes containing just the + * remaining bytes, using g_bytes_new_from_bytes(). Passing the same + * #GBytes instance multiple times potentially can result in duplicated + * data in the output stream. + * + * For the synchronous, blocking version of this function, see + * g_output_stream_write_bytes(). + **/ +void +g_output_stream_write_bytes_async (GOutputStream *stream, + GBytes *bytes, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + gsize size; + gconstpointer data; + + data = g_bytes_get_data (bytes, &size); + + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_task_data (task, g_bytes_ref (bytes), + (GDestroyNotify) g_bytes_unref); + + g_output_stream_write_async (stream, + data, size, + io_priority, + cancellable, + write_bytes_callback, + task); +} + +/** + * g_output_stream_write_bytes_finish: + * @stream: a #GOutputStream. + * @result: a #GAsyncResult. + * @error: a #GError location to store the error occurring, or %NULL to + * ignore. + * + * Finishes a stream write-from-#GBytes operation. + * + * Returns: a #gssize containing the number of bytes written to the stream. + **/ +gssize +g_output_stream_write_bytes_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); + g_return_val_if_fail (g_task_is_valid (result, stream), -1); + + return g_task_propagate_int (G_TASK (result), error); +} + +static void +async_ready_splice_callback_wrapper (GObject *source_object, + GAsyncResult *res, + gpointer _data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GOutputStreamClass *class; + GTask *task = _data; + gssize nspliced; + GError *error = NULL; + + g_output_stream_clear_pending (stream); + + if (g_async_result_legacy_propagate_error (res, &error)) + nspliced = -1; + else + { + class = G_OUTPUT_STREAM_GET_CLASS (stream); + nspliced = class->splice_finish (stream, res, &error); + } + + if (nspliced >= 0) + g_task_return_int (task, nspliced); + else + g_task_return_error (task, error); + g_object_unref (task); +} + +/** + * g_output_stream_splice_async: + * @stream: a #GOutputStream. + * @source: a #GInputStream. + * @flags: a set of #GOutputStreamSpliceFlags. + * @io_priority: the io priority of the request. + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): a #GAsyncReadyCallback. + * @user_data: (closure): user data passed to @callback. + * + * Splices a stream asynchronously. + * When the operation is finished @callback will be called. + * You can then call g_output_stream_splice_finish() to get the + * result of the operation. + * + * For the synchronous, blocking version of this function, see + * g_output_stream_splice(). + **/ +void +g_output_stream_splice_async (GOutputStream *stream, + GInputStream *source, + GOutputStreamSpliceFlags flags, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GOutputStreamClass *class; + GTask *task; + GError *error = NULL; + + g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); + g_return_if_fail (G_IS_INPUT_STREAM (source)); + + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_splice_async); + g_task_set_priority (task, io_priority); + g_task_set_task_data (task, g_object_ref (source), g_object_unref); + + if (g_input_stream_is_closed (source)) + { + g_task_return_new_error (task, + G_IO_ERROR, G_IO_ERROR_CLOSED, + _("Source stream is already closed")); + g_object_unref (task); + return; + } + + if (!g_output_stream_set_pending (stream, &error)) + { + g_task_return_error (task, error); + g_object_unref (task); + return; + } + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + class->splice_async (stream, source, flags, io_priority, cancellable, + async_ready_splice_callback_wrapper, task); +} + +/** + * g_output_stream_splice_finish: + * @stream: a #GOutputStream. + * @result: a #GAsyncResult. + * @error: a #GError location to store the error occurring, or %NULL to + * ignore. + * + * Finishes an asynchronous stream splice operation. + * + * Returns: a #gssize of the number of bytes spliced. Note that if the + * number of bytes spliced is greater than %G_MAXSSIZE, then that + * will be returned, and there is no way to determine the actual + * number of bytes spliced. + **/ +gssize +g_output_stream_splice_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE); + + /* @result is always the GTask created by g_output_stream_splice_async(); + * we called class->splice_finish() from async_ready_splice_callback_wrapper. + */ + return g_task_propagate_int (G_TASK (result), error); +} + +static void +async_ready_flush_callback_wrapper (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GOutputStreamClass *class; + GTask *task = user_data; + gboolean flushed; + GError *error = NULL; + + g_output_stream_clear_pending (stream); + + if (g_async_result_legacy_propagate_error (res, &error)) + flushed = FALSE; + else + { + class = G_OUTPUT_STREAM_GET_CLASS (stream); + flushed = class->flush_finish (stream, res, &error); + } + + if (flushed) + g_task_return_boolean (task, TRUE); + else + g_task_return_error (task, error); + g_object_unref (task); } /** * g_output_stream_flush_async: * @stream: a #GOutputStream. * @io_priority: the io priority of the request. - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @callback: a #GAsyncReadyCallback to call when the request is satisfied - * @user_data: the data to pass to callback function + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function * - * Flushes a stream asynchronously. + * Forces an asynchronous write of all user-space buffered data for + * the given @stream. * For behaviour details see g_output_stream_flush(). * * When the operation is finished @callback will be @@ -860,86 +1314,147 @@ g_output_stream_flush_async (GOutputStream *stream, gpointer user_data) { GOutputStreamClass *class; - GSimpleAsyncResult *simple; + GTask *task; GError *error = NULL; g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_flush_async); + g_task_set_priority (task, io_priority); + if (!g_output_stream_set_pending (stream, &error)) { - g_simple_async_report_gerror_in_idle (G_OBJECT (stream), - callback, - user_data, - error); - g_error_free (error); + g_task_return_error (task, error); + g_object_unref (task); return; } - stream->priv->outstanding_callback = callback; - g_object_ref (stream); - class = G_OUTPUT_STREAM_GET_CLASS (stream); if (class->flush_async == NULL) { - simple = g_simple_async_result_new (G_OBJECT (stream), - async_ready_callback_wrapper, - user_data, - g_output_stream_flush_async); - g_simple_async_result_complete_in_idle (simple); - g_object_unref (simple); + g_task_return_boolean (task, TRUE); + g_object_unref (task); return; } class->flush_async (stream, io_priority, cancellable, - async_ready_callback_wrapper, user_data); + async_ready_flush_callback_wrapper, task); } /** * g_output_stream_flush_finish: * @stream: a #GOutputStream. * @result: a GAsyncResult. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Finishes flushing an output stream. * - * Returns: %TRUE if flush operation suceeded, %FALSE otherwise. + * Returns: %TRUE if flush operation succeeded, %FALSE otherwise. **/ gboolean g_output_stream_flush_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple; - GOutputStreamClass *klass; - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); - g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE); + + /* @result is always the GTask created by g_output_stream_flush_async(); + * we called class->flush_finish() from async_ready_flush_callback_wrapper. + */ + return g_task_propagate_boolean (G_TASK (result), error); +} + - if (G_IS_SIMPLE_ASYNC_RESULT (result)) +static void +async_ready_close_callback_wrapper (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GOutputStreamClass *class; + GTask *task = user_data; + GError *error = g_task_get_task_data (task); + + stream->priv->closing = FALSE; + stream->priv->closed = TRUE; + + if (!error && !g_async_result_legacy_propagate_error (res, &error)) { - simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return FALSE; + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + class->close_finish (stream, res, + error ? NULL : &error); + } + + if (error != NULL) + g_task_return_error (task, error); + else + g_task_return_boolean (task, TRUE); + g_object_unref (task); +} + +static void +async_ready_close_flushed_callback_wrapper (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GOutputStreamClass *class; + GTask *task = user_data; + GError *error = NULL; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); - /* Special case default implementation */ - if (g_simple_async_result_get_source_tag (simple) == g_output_stream_flush_async) - return TRUE; + if (!g_async_result_legacy_propagate_error (res, &error)) + { + class->flush_finish (stream, res, &error); } - klass = G_OUTPUT_STREAM_GET_CLASS (stream); - return klass->flush_finish (stream, result, error); + /* propagate the possible error */ + if (error) + g_task_set_task_data (task, error, NULL); + + /* we still close, even if there was a flush error */ + class->close_async (stream, + g_task_get_priority (task), + g_task_get_cancellable (task), + async_ready_close_callback_wrapper, task); } +static void +real_close_async_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GTask *task = user_data; + GError *error = NULL; + gboolean ret; + + g_output_stream_clear_pending (stream); + + ret = g_output_stream_internal_close_finish (stream, res, &error); + + if (error != NULL) + g_task_return_error (task, error); + else + g_task_return_boolean (task, ret); + + g_object_unref (task); +} /** * g_output_stream_close_async: * @stream: A #GOutputStream. * @io_priority: the io priority of the request. - * @callback: callback to call when the request is satisfied - * @user_data: the data to pass to callback function - * @cancellable: optional cancellable object + * @cancellable: (allow-none): optional cancellable object + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function * * Requests an asynchronous close of the stream, releasing resources * related to it. When the operation is finished @callback will be @@ -959,46 +1474,88 @@ g_output_stream_close_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GOutputStreamClass *class; - GSimpleAsyncResult *simple; + GTask *task; GError *error = NULL; g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); - if (stream->priv->closed) + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_close_async); + g_task_set_priority (task, io_priority); + + if (!g_output_stream_set_pending (stream, &error)) { - simple = g_simple_async_result_new (G_OBJECT (stream), - callback, - user_data, - g_output_stream_close_async); - g_simple_async_result_complete_in_idle (simple); - g_object_unref (simple); + g_task_return_error (task, error); + g_object_unref (task); return; } - if (!g_output_stream_set_pending (stream, &error)) + g_output_stream_internal_close_async (stream, io_priority, cancellable, + real_close_async_cb, task); +} + +/* Must always be called inside + * g_output_stream_set_pending()/g_output_stream_clear_pending(). + */ +void +g_output_stream_internal_close_async (GOutputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GOutputStreamClass *class; + GTask *task; + + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_internal_close_async); + g_task_set_priority (task, io_priority); + + if (stream->priv->closed) { - g_simple_async_report_gerror_in_idle (G_OBJECT (stream), - callback, - user_data, - error); - g_error_free (error); + g_task_return_boolean (task, TRUE); + g_object_unref (task); return; } - + class = G_OUTPUT_STREAM_GET_CLASS (stream); stream->priv->closing = TRUE; - stream->priv->outstanding_callback = callback; - g_object_ref (stream); - class->close_async (stream, io_priority, cancellable, - async_ready_close_callback_wrapper, user_data); + + /* Call close_async directly if there is no need to flush, or if the flush + can be done sync (in the output stream async close thread) */ + if (class->flush_async == NULL || + (class->flush_async == g_output_stream_real_flush_async && + (class->flush == NULL || class->close_async == g_output_stream_real_close_async))) + { + class->close_async (stream, io_priority, cancellable, + async_ready_close_callback_wrapper, task); + } + else + { + /* First do an async flush, then do the async close in the callback + wrapper (see async_ready_close_flushed_callback_wrapper) */ + class->flush_async (stream, io_priority, cancellable, + async_ready_close_flushed_callback_wrapper, task); + } +} + +static gboolean +g_output_stream_internal_close_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_internal_close_async), FALSE); + + return g_task_propagate_boolean (G_TASK (result), error); } /** * g_output_stream_close_finish: * @stream: a #GOutputStream. * @result: a #GAsyncResult. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Closes an output stream. @@ -1010,25 +1567,14 @@ g_output_stream_close_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple; - GOutputStreamClass *class; - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); - g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE); - - if (G_IS_SIMPLE_ASYNC_RESULT (result)) - { - simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return FALSE; - - /* Special case already closed */ - if (g_simple_async_result_get_source_tag (simple) == g_output_stream_close_async) - return TRUE; - } + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE); - class = G_OUTPUT_STREAM_GET_CLASS (stream); - return class->close_finish (stream, result, error); + /* @result is always the GTask created by g_output_stream_close_async(); + * we called class->close_finish() from async_ready_close_callback_wrapper. + */ + return g_task_propagate_boolean (G_TASK (result), error); } /** @@ -1087,14 +1633,14 @@ g_output_stream_has_pending (GOutputStream *stream) /** * g_output_stream_set_pending: * @stream: a #GOutputStream. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Sets @stream to have actions pending. If the pending flag is * already set or @stream is closed, it will return %FALSE and set * @error. * - * Return value: %TRUE if pending was previously unset and is now set. + * Returns: %TRUE if pending was previously unset and is now set. **/ gboolean g_output_stream_set_pending (GOutputStream *stream, @@ -1137,6 +1683,28 @@ g_output_stream_clear_pending (GOutputStream *stream) stream->priv->pending = FALSE; } +/** + * g_output_stream_async_write_is_via_threads: + * @stream: a #GOutputStream. + * + * Checks if an ouput stream's write_async function uses threads. + * + * Returns: %TRUE if @stream's write_async function uses threads. + **/ +gboolean +g_output_stream_async_write_is_via_threads (GOutputStream *stream) +{ + GOutputStreamClass *class; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + return (class->write_async == g_output_stream_real_write_async && + !(G_IS_POLLABLE_OUTPUT_STREAM (stream) && + g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))); +} + /******************************************** * Default implementation of async ops * @@ -1149,23 +1717,77 @@ typedef struct { } WriteData; static void -write_async_thread (GSimpleAsyncResult *res, - GObject *object, - GCancellable *cancellable) +free_write_data (WriteData *op) { - WriteData *op; + g_slice_free (WriteData, op); +} + +static void +write_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GOutputStream *stream = source_object; + WriteData *op = task_data; GOutputStreamClass *class; GError *error = NULL; + gssize count_written; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + count_written = class->write_fn (stream, op->buffer, op->count_requested, + cancellable, &error); + if (count_written == -1) + g_task_return_error (task, error); + else + g_task_return_int (task, count_written); +} + +static void write_async_pollable (GPollableOutputStream *stream, + GTask *task); + +static gboolean +write_async_pollable_ready (GPollableOutputStream *stream, + gpointer user_data) +{ + GTask *task = user_data; + + write_async_pollable (stream, task); + return FALSE; +} + +static void +write_async_pollable (GPollableOutputStream *stream, + GTask *task) +{ + GError *error = NULL; + WriteData *op = g_task_get_task_data (task); + gssize count_written; + + if (g_task_return_error_if_cancelled (task)) + return; - class = G_OUTPUT_STREAM_GET_CLASS (object); - op = g_simple_async_result_get_op_res_gpointer (res); - op->count_written = class->write_fn (G_OUTPUT_STREAM (object), op->buffer, op->count_requested, - cancellable, &error); - if (op->count_written == -1) + count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)-> + write_nonblocking (stream, op->buffer, op->count_requested, &error); + + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { - g_simple_async_result_set_from_error (res, error); + GSource *source; + g_error_free (error); + + source = g_pollable_output_stream_create_source (stream, + g_task_get_cancellable (task)); + g_task_attach_source (task, source, + (GSourceFunc) write_async_pollable_ready); + g_source_unref (source); + return; } + + if (count_written == -1) + g_task_return_error (task, error); + else + g_task_return_int (task, count_written); } static void @@ -1177,17 +1799,21 @@ g_output_stream_real_write_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *res; + GTask *task; WriteData *op; - op = g_new0 (WriteData, 1); - res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async); - g_simple_async_result_set_op_res_gpointer (res, op, g_free); + op = g_slice_new0 (WriteData); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_check_cancellable (task, FALSE); + g_task_set_task_data (task, op, (GDestroyNotify) free_write_data); op->buffer = buffer; op->count_requested = count; - - g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable); - g_object_unref (res); + + if (!g_output_stream_async_write_is_via_threads (stream)) + write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task); + else + g_task_run_in_thread (task, write_async_thread); + g_object_unref (task); } static gssize @@ -1195,44 +1821,207 @@ g_output_stream_real_write_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); - WriteData *op; + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_write_async); - op = g_simple_async_result_get_op_res_gpointer (simple); - return op->count_written; + return g_task_propagate_int (G_TASK (result), error); } typedef struct { GInputStream *source; GOutputStreamSpliceFlags flags; - gssize bytes_copied; + gssize n_read; + gssize n_written; + gsize bytes_copied; + GError *error; + guint8 *buffer; } SpliceData; static void -splice_async_thread (GSimpleAsyncResult *result, - GObject *object, - GCancellable *cancellable) +free_splice_data (SpliceData *op) +{ + g_clear_pointer (&op->buffer, g_free); + g_object_unref (op->source); + g_clear_error (&op->error); + g_free (op); +} + +static void +real_splice_async_complete_cb (GTask *task) +{ + SpliceData *op = g_task_get_task_data (task); + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE && + !g_input_stream_is_closed (op->source)) + return; + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET && + !g_output_stream_is_closed (g_task_get_source_object (task))) + return; + + if (op->error != NULL) + { + g_task_return_error (task, op->error); + op->error = NULL; + } + else + { + g_task_return_int (task, op->bytes_copied); + } + + g_object_unref (task); +} + +static void +real_splice_async_close_input_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GTask *task = user_data; + + g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL); + + real_splice_async_complete_cb (task); +} + +static void +real_splice_async_close_output_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GTask *task = G_TASK (user_data); + SpliceData *op = g_task_get_task_data (task); + GError **error = (op->error == NULL) ? &op->error : NULL; + + g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error); + + real_splice_async_complete_cb (task); +} + +static void +real_splice_async_complete (GTask *task) +{ + SpliceData *op = g_task_get_task_data (task); + gboolean done = TRUE; + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE) + { + done = FALSE; + g_input_stream_close_async (op->source, g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_close_input_cb, task); + } + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET) + { + done = FALSE; + g_output_stream_internal_close_async (g_task_get_source_object (task), + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_close_output_cb, + task); + } + + if (done) + real_splice_async_complete_cb (task); +} + +static void real_splice_async_read_cb (GObject *source, + GAsyncResult *res, + gpointer user_data); + +static void +real_splice_async_write_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) { - SpliceData *op; GOutputStreamClass *class; - GError *error = NULL; - GOutputStream *stream; + GTask *task = G_TASK (user_data); + SpliceData *op = g_task_get_task_data (task); + gssize ret; - stream = G_OUTPUT_STREAM (object); - class = G_OUTPUT_STREAM_GET_CLASS (object); - op = g_simple_async_result_get_op_res_gpointer (result); - - op->bytes_copied = class->splice (stream, - op->source, - op->flags, - cancellable, - &error); - if (op->bytes_copied == -1) + class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task)); + + ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error); + + if (ret == -1) { - g_simple_async_result_set_from_error (result, error); - g_error_free (error); + real_splice_async_complete (task); + return; + } + + op->n_written += ret; + op->bytes_copied += ret; + if (op->bytes_copied > G_MAXSSIZE) + op->bytes_copied = G_MAXSSIZE; + + if (op->n_written < op->n_read) + { + class->write_async (g_task_get_source_object (task), + op->buffer + op->n_written, + op->n_read - op->n_written, + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_write_cb, task); + return; + } + + g_input_stream_read_async (op->source, op->buffer, 8192, + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_read_cb, task); +} + +static void +real_splice_async_read_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStreamClass *class; + GTask *task = G_TASK (user_data); + SpliceData *op = g_task_get_task_data (task); + gssize ret; + + class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task)); + + ret = g_input_stream_read_finish (op->source, res, &op->error); + if (ret == -1 || ret == 0) + { + real_splice_async_complete (task); + return; } + + op->n_read = ret; + op->n_written = 0; + + class->write_async (g_task_get_source_object (task), op->buffer, + op->n_read, g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_write_cb, task); +} + +static void +splice_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GOutputStream *stream = source_object; + SpliceData *op = task_data; + GOutputStreamClass *class; + GError *error = NULL; + gssize bytes_copied; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + bytes_copied = class->splice (stream, + op->source, + op->flags, + cancellable, + &error); + if (bytes_copied == -1) + g_task_return_error (task, error); + else + g_task_return_int (task, bytes_copied); } static void @@ -1244,20 +2033,29 @@ g_output_stream_real_splice_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *res; + GTask *task; SpliceData *op; op = g_new0 (SpliceData, 1); - res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_splice_async); - g_simple_async_result_set_op_res_gpointer (res, op, g_free); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data); op->flags = flags; - op->source = source; + op->source = g_object_ref (source); - /* TODO: In the case where both source and destintion have - non-threadbased async calls we can use a true async copy here */ - - g_simple_async_result_run_in_thread (res, splice_async_thread, io_priority, cancellable); - g_object_unref (res); + if (g_input_stream_async_read_is_via_threads (source) && + g_output_stream_async_write_is_via_threads (stream)) + { + g_task_run_in_thread (task, splice_async_thread); + g_object_unref (task); + } + else + { + op->buffer = g_malloc (8192); + g_input_stream_read_async (op->source, op->buffer, 8192, + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_read_cb, task); + } } static gssize @@ -1265,34 +2063,32 @@ g_output_stream_real_splice_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); - SpliceData *op; + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_splice_async); - op = g_simple_async_result_get_op_res_gpointer (simple); - return op->bytes_copied; + return g_task_propagate_int (G_TASK (result), error); } static void -flush_async_thread (GSimpleAsyncResult *res, - GObject *object, - GCancellable *cancellable) +flush_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) { + GOutputStream *stream = source_object; GOutputStreamClass *class; gboolean result; GError *error = NULL; - class = G_OUTPUT_STREAM_GET_CLASS (object); + class = G_OUTPUT_STREAM_GET_CLASS (stream); result = TRUE; if (class->flush) - result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error); + result = class->flush (stream, cancellable, &error); - if (!result) - { - g_simple_async_result_set_from_error (res, error); - g_error_free (error); - } + if (result) + g_task_return_boolean (task, TRUE); + else + g_task_return_error (task, error); } static void @@ -1302,12 +2098,12 @@ g_output_stream_real_flush_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *res; + GTask *task; - res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async); - - g_simple_async_result_run_in_thread (res, flush_async_thread, io_priority, cancellable); - g_object_unref (res); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_priority (task, io_priority); + g_task_run_in_thread (task, flush_async_thread); + g_object_unref (task); } static gboolean @@ -1315,33 +2111,52 @@ g_output_stream_real_flush_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - return TRUE; + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + + return g_task_propagate_boolean (G_TASK (result), error); } static void -close_async_thread (GSimpleAsyncResult *res, - GObject *object, - GCancellable *cancellable) +close_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) { + GOutputStream *stream = source_object; GOutputStreamClass *class; GError *error = NULL; - gboolean result; + gboolean result = TRUE; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + /* Do a flush here if there is a flush function, and we did not have to do + * an async flush before (see g_output_stream_close_async) + */ + if (class->flush != NULL && + (class->flush_async == NULL || + class->flush_async == g_output_stream_real_flush_async)) + { + result = class->flush (stream, cancellable, &error); + } /* Auto handling of cancelation disabled, and ignore cancellation, since we want to close things anyway, although possibly in a quick-n-dirty way. At least we never want to leak open handles */ - class = G_OUTPUT_STREAM_GET_CLASS (object); if (class->close_fn) { - result = class->close_fn (G_OUTPUT_STREAM (object), cancellable, &error); + /* Make sure to close, even if the flush failed (see sync close) */ if (!result) - { - g_simple_async_result_set_from_error (res, error); - g_error_free (error); - } + class->close_fn (stream, cancellable, NULL); + else + result = class->close_fn (stream, cancellable, &error); } + + if (result) + g_task_return_boolean (task, TRUE); + else + g_task_return_error (task, error); } static void @@ -1351,14 +2166,12 @@ g_output_stream_real_close_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *res; - - res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_close_async); + GTask *task; - g_simple_async_result_set_handle_cancellation (res, FALSE); - - g_simple_async_result_run_in_thread (res, close_async_thread, io_priority, cancellable); - g_object_unref (res); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_priority (task, io_priority); + g_task_run_in_thread (task, close_async_thread); + g_object_unref (task); } static gboolean @@ -1366,10 +2179,7 @@ g_output_stream_real_close_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_close_async); - return TRUE; -} + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); -#define __G_OUTPUT_STREAM_C__ -#include "gioaliasdef.c" + return g_task_propagate_boolean (G_TASK (result), error); +}