X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gio%2Fgoutputstream.c;h=e7b1167364f08c60918d6362aa7edf7513812b48;hb=e55a953642a9e402f4363f9fa347b6061dd78990;hp=f1f780e2130927b4e112ab2437951e05d4e92052;hpb=cc3de68e2151cf3341115212c56c17714ca03bb2;p=platform%2Fupstream%2Fglib.git diff --git a/gio/goutputstream.c b/gio/goutputstream.c index f1f780e..e7b1167 100644 --- a/gio/goutputstream.c +++ b/gio/goutputstream.c @@ -13,37 +13,47 @@ * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General - * Public License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place, Suite 330, - * Boston, MA 02111-1307, USA. + * Public License along with this library; if not, see . * * Author: Alexander Larsson */ -#include +#include "config.h" +#include #include "goutputstream.h" -#include "gsimpleasyncresult.h" +#include "gcancellable.h" +#include "gasyncresult.h" +#include "gtask.h" +#include "ginputstream.h" +#include "gioerror.h" +#include "gioprivate.h" #include "glibintl.h" - -#include "gioalias.h" +#include "gpollableoutputstream.h" /** * SECTION:goutputstream * @short_description: Base class for implementing streaming output + * @include: gio/gio.h * + * #GOutputStream has functions to write to a stream (g_output_stream_write()), + * to close a stream (g_output_stream_close()) and to flush pending writes + * (g_output_stream_flush()). * + * To copy the content of an input stream to an output stream without + * manually handling the reads and writes, use g_output_stream_splice(). * + * All of these functions have async variants too. **/ -G_DEFINE_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT); - struct _GOutputStreamPrivate { guint closed : 1; guint pending : 1; - guint cancelled : 1; + guint closing : 1; GAsyncReadyCallback outstanding_callback; }; +G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT) + static gssize g_output_stream_real_splice (GOutputStream *stream, GInputStream *source, GOutputStreamSpliceFlags flags, @@ -85,17 +95,17 @@ static void g_output_stream_real_close_async (GOutputStream *s static gboolean g_output_stream_real_close_finish (GOutputStream *stream, GAsyncResult *result, GError **error); - -static void -g_output_stream_finalize (GObject *object) -{ - GOutputStream *stream; - - stream = G_OUTPUT_STREAM (object); - - if (G_OBJECT_CLASS (g_output_stream_parent_class)->finalize) - (*G_OBJECT_CLASS (g_output_stream_parent_class)->finalize) (object); -} +static gboolean g_output_stream_internal_close (GOutputStream *stream, + GCancellable *cancellable, + GError **error); +static void g_output_stream_internal_close_async (GOutputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer data); +static gboolean g_output_stream_internal_close_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error); static void g_output_stream_dispose (GObject *object) @@ -106,19 +116,15 @@ g_output_stream_dispose (GObject *object) if (!stream->priv->closed) g_output_stream_close (stream, NULL, NULL); - - if (G_OBJECT_CLASS (g_output_stream_parent_class)->dispose) - (*G_OBJECT_CLASS (g_output_stream_parent_class)->dispose) (object); + + G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object); } static void g_output_stream_class_init (GOutputStreamClass *klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); - - g_type_class_add_private (klass, sizeof (GOutputStreamPrivate)); - - gobject_class->finalize = g_output_stream_finalize; + gobject_class->dispose = g_output_stream_dispose; klass->splice = g_output_stream_real_splice; @@ -136,40 +142,41 @@ g_output_stream_class_init (GOutputStreamClass *klass) static void g_output_stream_init (GOutputStream *stream) { - stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, - G_TYPE_OUTPUT_STREAM, - GOutputStreamPrivate); + stream->priv = g_output_stream_get_instance_private (stream); } /** * g_output_stream_write: * @stream: a #GOutputStream. - * @buffer: the buffer containing the data to write. + * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. * @count: the number of bytes to write - * @cancellable: optional cancellable object - * @error: location to store the error occuring, or %NULL to ignore + * @cancellable: (allow-none): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore * * Tries to write @count bytes from @buffer into the stream. Will block * during the operation. * - * If count is zero returns zero and does nothing. A value of @count + * If count is 0, returns 0 and does nothing. A value of @count * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error. * * On success, the number of bytes written to the stream is returned. * It is not an error if this is not the same as the requested size, as it - * can happen e.g. on a partial i/o error, or if the there is not enough - * storage in the stream. All writes either block until at least one byte - * is written, so zero is never returned (unless @count is zero). + * can happen e.g. on a partial I/O error, or if there is not enough + * storage in the stream. All writes block until at least one byte + * is written or an error occurs; 0 is never returned (unless + * @count is 0). * - * If @cancellable is not NULL, then the operation can be cancelled by + * If @cancellable is not %NULL, then the operation can be cancelled by * triggering the cancellable object from another thread. If the operation - * was cancelled, the error G_IO_ERROR_CANCELLED will be returned. If an + * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an * operation was partially finished when the operation was cancelled the * partial result will be returned, without an error. * * On error -1 is returned and @error is set accordingly. * - * Return value: Number of bytes written, or -1 on error + * Virtual: write_fn + * + * Returns: Number of bytes written, or -1 on error **/ gssize g_output_stream_write (GOutputStream *stream, @@ -190,7 +197,7 @@ g_output_stream_write (GOutputStream *stream, if (((gssize) count) < 0) { g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, - _("Too large count value passed to g_output_stream_write")); + _("Too large count value passed to %s"), G_STRFUNC); return -1; } @@ -198,8 +205,8 @@ g_output_stream_write (GOutputStream *stream, if (class->write_fn == NULL) { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, - _("Output stream doesn't implement write")); + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, + _("Output stream doesn't implement write")); return -1; } @@ -207,12 +214,12 @@ g_output_stream_write (GOutputStream *stream, return -1; if (cancellable) - g_push_current_cancellable (cancellable); + g_cancellable_push_current (cancellable); res = class->write_fn (stream, buffer, count, cancellable, error); if (cancellable) - g_pop_current_cancellable (cancellable); + g_cancellable_pop_current (cancellable); g_output_stream_clear_pending (stream); @@ -222,12 +229,12 @@ g_output_stream_write (GOutputStream *stream, /** * g_output_stream_write_all: * @stream: a #GOutputStream. - * @buffer: the buffer containing the data to write. + * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. * @count: the number of bytes to write - * @bytes_written: location to store the number of bytes that was + * @bytes_written: (out): location to store the number of bytes that was * written to the stream - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @error: location to store the error occuring, or %NULL to ignore + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @error: location to store the error occurring, or %NULL to ignore * * Tries to write @count bytes from @buffer into the stream. Will block * during the operation. @@ -238,11 +245,11 @@ g_output_stream_write (GOutputStream *stream, * On a successful write of @count bytes, %TRUE is returned, and @bytes_written * is set to @count. * - * If there is an error during the operation FALSE is returned and @error + * If there is an error during the operation %FALSE is returned and @error * is set to indicate the error status, @bytes_written is updated to contain - * the number of bytes written into the stream before the error occured. + * the number of bytes written into the stream before the error occurred. * - * Return value: %TRUE on success, %FALSE if there was an error + * Returns: %TRUE on success, %FALSE if there was an error **/ gboolean g_output_stream_write_all (GOutputStream *stream, @@ -283,13 +290,150 @@ g_output_stream_write_all (GOutputStream *stream, } /** + * g_output_stream_printf: + * @stream: a #GOutputStream. + * @bytes_written: (out): location to store the number of bytes that was + * written to the stream + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @error: location to store the error occurring, or %NULL to ignore + * @format: the format string. See the printf() documentation + * @...: the parameters to insert into the format string + * + * This is a utility function around g_output_stream_write_all(). It + * uses g_strdup_vprintf() to turn @format and @... into a string that + * is then written to @stream. + * + * See the documentation of g_output_stream_write_all() about the + * behavior of the actual write operation. + * + * Note that partial writes cannot be properly checked with this + * function due to the variable length of the written string, if you + * need precise control over partial write failures, you need to + * create you own printf()-like wrapper around g_output_stream_write() + * or g_output_stream_write_all(). + * + * Since: 2.40 + * + * Returns: %TRUE on success, %FALSE if there was an error + **/ +gboolean +g_output_stream_printf (GOutputStream *stream, + gsize *bytes_written, + GCancellable *cancellable, + GError **error, + const gchar *format, + ...) +{ + va_list args; + gboolean success; + + va_start (args, format); + success = g_output_stream_vprintf (stream, bytes_written, cancellable, + error, format, args); + va_end (args); + + return success; +} + +/** + * g_output_stream_vprintf: + * @stream: a #GOutputStream. + * @bytes_written: (out): location to store the number of bytes that was + * written to the stream + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @error: location to store the error occurring, or %NULL to ignore + * @format: the format string. See the printf() documentation + * @args: the parameters to insert into the format string + * + * This is a utility function around g_output_stream_write_all(). It + * uses g_strdup_vprintf() to turn @format and @args into a string that + * is then written to @stream. + * + * See the documentation of g_output_stream_write_all() about the + * behavior of the actual write operation. + * + * Note that partial writes cannot be properly checked with this + * function due to the variable length of the written string, if you + * need precise control over partial write failures, you need to + * create you own printf()-like wrapper around g_output_stream_write() + * or g_output_stream_write_all(). + * + * Since: 2.40 + * + * Returns: %TRUE on success, %FALSE if there was an error + **/ +gboolean +g_output_stream_vprintf (GOutputStream *stream, + gsize *bytes_written, + GCancellable *cancellable, + GError **error, + const gchar *format, + va_list args) +{ + gchar *text; + gboolean success; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (stream), FALSE); + g_return_val_if_fail (error == NULL || *error == NULL, FALSE); + g_return_val_if_fail (format != NULL, FALSE); + + text = g_strdup_vprintf (format, args); + success = g_output_stream_write_all (stream, + text, strlen (text), + bytes_written, cancellable, error); + g_free (text); + + return success; +} + +/** + * g_output_stream_write_bytes: + * @stream: a #GOutputStream. + * @bytes: the #GBytes to write + * @cancellable: (allow-none): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore + * + * A wrapper function for g_output_stream_write() which takes a + * #GBytes as input. This can be more convenient for use by language + * bindings or in other cases where the refcounted nature of #GBytes + * is helpful over a bare pointer interface. + * + * However, note that this function may still perform partial writes, + * just like g_output_stream_write(). If that occurs, to continue + * writing, you will need to create a new #GBytes containing just the + * remaining bytes, using g_bytes_new_from_bytes(). Passing the same + * #GBytes instance multiple times potentially can result in duplicated + * data in the output stream. + * + * Returns: Number of bytes written, or -1 on error + **/ +gssize +g_output_stream_write_bytes (GOutputStream *stream, + GBytes *bytes, + GCancellable *cancellable, + GError **error) +{ + gsize size; + gconstpointer data; + + data = g_bytes_get_data (bytes, &size); + + return g_output_stream_write (stream, + data, size, + cancellable, + error); +} + +/** * g_output_stream_flush: * @stream: a #GOutputStream. - * @cancellable: optional cancellable object - * @error: location to store the error occuring, or %NULL to ignore + * @cancellable: (allow-none): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore * - * Flushed any outstanding buffers in the stream. Will block during - * the operation. Closing the stream will implicitly cause a flush. + * Forces a write of all user-space buffered data for the given + * @stream. Will block during the operation. Closing the stream will + * implicitly cause a flush. * * This function is optional for inherited classes. * @@ -297,7 +441,7 @@ g_output_stream_write_all (GOutputStream *stream, * triggering the cancellable object from another thread. If the operation * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. * - * Return value: %TRUE on success, %FALSE on error + * Returns: %TRUE on success, %FALSE on error **/ gboolean g_output_stream_flush (GOutputStream *stream, @@ -318,12 +462,12 @@ g_output_stream_flush (GOutputStream *stream, if (class->flush) { if (cancellable) - g_push_current_cancellable (cancellable); + g_cancellable_push_current (cancellable); res = class->flush (stream, cancellable, error); if (cancellable) - g_pop_current_cancellable (cancellable); + g_cancellable_pop_current (cancellable); } g_output_stream_clear_pending (stream); @@ -336,13 +480,17 @@ g_output_stream_flush (GOutputStream *stream, * @stream: a #GOutputStream. * @source: a #GInputStream. * @flags: a set of #GOutputStreamSpliceFlags. - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @error: a #GError location to store the error occuring, or %NULL to + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Splices an input stream into an output stream. * - * Returns: a #gssize containig the size of the data spliced. + * Returns: a #gssize containing the size of the data spliced, or + * -1 if an error occurred. Note that if the number of bytes + * spliced is greater than %G_MAXSSIZE, then that will be + * returned, and there is no way to determine the actual number + * of bytes spliced. **/ gssize g_output_stream_splice (GOutputStream *stream, @@ -352,35 +500,34 @@ g_output_stream_splice (GOutputStream *stream, GError **error) { GOutputStreamClass *class; - gboolean res; + gssize bytes_copied; g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1); if (g_input_stream_is_closed (source)) { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED, - _("Source stream is already closed")); + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, + _("Source stream is already closed")); return -1; } if (!g_output_stream_set_pending (stream, error)) return -1; - + class = G_OUTPUT_STREAM_GET_CLASS (stream); - res = TRUE; if (cancellable) - g_push_current_cancellable (cancellable); - - res = class->splice (stream, source, flags, cancellable, error); - + g_cancellable_push_current (cancellable); + + bytes_copied = class->splice (stream, source, flags, cancellable, error); + if (cancellable) - g_pop_current_cancellable (cancellable); - + g_cancellable_pop_current (cancellable); + g_output_stream_clear_pending (stream); - return res; + return bytes_copied; } static gssize @@ -392,21 +539,21 @@ g_output_stream_real_splice (GOutputStream *stream, { GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream); gssize n_read, n_written; - gssize bytes_copied; + gsize bytes_copied; char buffer[8192], *p; gboolean res; bytes_copied = 0; - if (class->write_fn == NULL) + if (class->write_fn == NULL) { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, - _("Output stream doesn't implement write")); + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, + _("Output stream doesn't implement write")); res = FALSE; goto notsupported; } - + res = TRUE; - do + do { n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error); if (n_read == -1) @@ -414,7 +561,7 @@ g_output_stream_real_splice (GOutputStream *stream, res = FALSE; break; } - + if (n_read == 0) break; @@ -432,6 +579,9 @@ g_output_stream_real_splice (GOutputStream *stream, n_read -= n_written; bytes_copied += n_written; } + + if (bytes_copied > G_MAXSSIZE) + bytes_copied = G_MAXSSIZE; } while (res); @@ -439,31 +589,78 @@ g_output_stream_real_splice (GOutputStream *stream, if (!res) error = NULL; /* Ignore further errors */ - if (flags & G_OUTPUT_STREAM_SPLICE_FLAGS_CLOSE_SOURCE) + if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE) { /* Don't care about errors in source here */ g_input_stream_close (source, cancellable, NULL); } - if (flags & G_OUTPUT_STREAM_SPLICE_FLAGS_CLOSE_TARGET) + if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET) { /* But write errors on close are bad! */ - if (!class->close_fn (stream, cancellable, error)) - res = FALSE; + res = g_output_stream_internal_close (stream, cancellable, error); } if (res) return bytes_copied; - + return -1; } +/* Must always be called inside + * g_output_stream_set_pending()/g_output_stream_clear_pending(). */ +static gboolean +g_output_stream_internal_close (GOutputStream *stream, + GCancellable *cancellable, + GError **error) +{ + GOutputStreamClass *class; + gboolean res; + + if (stream->priv->closed) + return TRUE; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + stream->priv->closing = TRUE; + + if (cancellable) + g_cancellable_push_current (cancellable); + + if (class->flush) + res = class->flush (stream, cancellable, error); + else + res = TRUE; + + if (!res) + { + /* flushing caused the error that we want to return, + * but we still want to close the underlying stream if possible + */ + if (class->close_fn) + class->close_fn (stream, cancellable, NULL); + } + else + { + res = TRUE; + if (class->close_fn) + res = class->close_fn (stream, cancellable, error); + } + + if (cancellable) + g_cancellable_pop_current (cancellable); + + stream->priv->closing = FALSE; + stream->priv->closed = TRUE; + + return res; +} /** * g_output_stream_close: * @stream: A #GOutputStream. - * @cancellable: optional cancellable object - * @error: location to store the error occuring, or %NULL to ignore + * @cancellable: (allow-none): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore * * Closes the stream, releasing resources related to it. * @@ -474,8 +671,8 @@ g_output_stream_real_splice (GOutputStream *stream, * stream. * * Streams will be automatically closed when the last reference - * is dropped, but you might want to call make sure resources - * are released as early as possible. + * is dropped, but you might want to call this function to make sure + * resources are released as early as possible. * * Some streams might keep the backing store of the stream (e.g. a file descriptor) * open after the stream is closed. See the documentation for the individual @@ -483,11 +680,11 @@ g_output_stream_real_splice (GOutputStream *stream, * * On failure the first error that happened will be reported, but the close * operation will finish as much as possible. A stream that failed to - * close will still return %G_IO_ERROR_CLOSED all operations. Still, it + * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it * is important to check and report the error to the user, otherwise * there might be a loss of data as all data might not be written. * - * If @cancellable is not NULL, then the operation can be cancelled by + * If @cancellable is not %NULL, then the operation can be cancelled by * triggering the cancellable object from another thread. If the operation * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. * Cancelling a close will still leave the stream closed, but there some streams @@ -495,98 +692,72 @@ g_output_stream_real_splice (GOutputStream *stream, * cancellation (as with any error) there is no guarantee that all written * data will reach the target. * - * Return value: %TRUE on success, %FALSE on failure + * Returns: %TRUE on success, %FALSE on failure **/ gboolean g_output_stream_close (GOutputStream *stream, GCancellable *cancellable, GError **error) { - GOutputStreamClass *class; gboolean res; g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); - class = G_OUTPUT_STREAM_GET_CLASS (stream); - if (stream->priv->closed) return TRUE; if (!g_output_stream_set_pending (stream, error)) return FALSE; - if (cancellable) - g_push_current_cancellable (cancellable); + res = g_output_stream_internal_close (stream, cancellable, error); - if (class->flush) - res = class->flush (stream, cancellable, error); - else - res = TRUE; - - if (!res) - { - /* flushing caused the error that we want to return, - * but we still want to close the underlying stream if possible - */ - if (class->close_fn) - class->close_fn (stream, cancellable, NULL); - } - else - { - res = TRUE; - if (class->close_fn) - res = class->close_fn (stream, cancellable, error); - } - - if (cancellable) - g_pop_current_cancellable (cancellable); - - stream->priv->closed = TRUE; g_output_stream_clear_pending (stream); return res; } static void -async_ready_callback_wrapper (GObject *source_object, - GAsyncResult *res, - gpointer user_data) -{ - GOutputStream *stream = G_OUTPUT_STREAM (source_object); - - g_output_stream_clear_pending (stream); - if (stream->priv->outstanding_callback) - (*stream->priv->outstanding_callback) (source_object, res, user_data); - g_object_unref (stream); -} - -static void -async_ready_close_callback_wrapper (GObject *source_object, +async_ready_write_callback_wrapper (GObject *source_object, GAsyncResult *res, gpointer user_data) { GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GOutputStreamClass *class; + GTask *task = user_data; + gssize nwrote; + GError *error = NULL; - stream->priv->closed = TRUE; g_output_stream_clear_pending (stream); - if (stream->priv->outstanding_callback) - (*stream->priv->outstanding_callback) (source_object, res, user_data); - g_object_unref (stream); + + if (g_async_result_legacy_propagate_error (res, &error)) + nwrote = -1; + else + { + class = G_OUTPUT_STREAM_GET_CLASS (stream); + nwrote = class->write_finish (stream, res, &error); + } + + if (nwrote >= 0) + g_task_return_int (task, nwrote); + else + g_task_return_error (task, error); + g_object_unref (task); } /** * g_output_stream_write_async: * @stream: A #GOutputStream. - * @buffer: the buffer containing the data to write. + * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. * @count: the number of bytes to write * @io_priority: the io priority of the request. - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @callback: callback to call when the request is satisfied - * @user_data: the data to pass to callback function + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function * * Request an asynchronous write of @count bytes from @buffer into - * the stream. When the operation is finished @callback will be called, - * giving the results. + * the stream. When the operation is finished @callback will be called. + * You can then call g_output_stream_write_finish() to get the result of the + * operation. * * During an async request no other sync and async calls are allowed, * and will result in %G_IO_ERROR_PENDING errors. @@ -599,6 +770,10 @@ async_ready_close_callback_wrapper (GObject *source_object, * requested size, as it can happen e.g. on a partial I/O error, * but generally we try to write as many bytes as requested. * + * You are guaranteed that this method will never fail with + * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the + * method will just wait until this changes. + * * Any outstanding I/O request with higher priority (lower numerical * value) will be executed before an outstanding request with lower * priority. Default priority is %G_PRIORITY_DEFAULT. @@ -609,7 +784,12 @@ async_ready_close_callback_wrapper (GObject *source_object, * * For the synchronous, blocking version of this function, see * g_output_stream_write(). - **/ + * + * Note that no copy of @buffer will be made, so it must stay valid + * until @callback is called. See g_output_stream_write_bytes_async() + * for a #GBytes version that will automatically hold a reference to + * the contents (without copying) for the duration of the call. + */ void g_output_stream_write_async (GOutputStream *stream, const void *buffer, @@ -620,56 +800,50 @@ g_output_stream_write_async (GOutputStream *stream, gpointer user_data) { GOutputStreamClass *class; - GSimpleAsyncResult *simple; GError *error = NULL; + GTask *task; g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); g_return_if_fail (buffer != NULL); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_write_async); + g_task_set_priority (task, io_priority); + if (count == 0) { - simple = g_simple_async_result_new (G_OBJECT (stream), - callback, - user_data, - g_output_stream_write_async); - g_simple_async_result_complete_in_idle (simple); - g_object_unref (simple); + g_task_return_int (task, 0); + g_object_unref (task); return; } if (((gssize) count) < 0) { - g_simple_async_report_error_in_idle (G_OBJECT (stream), - callback, - user_data, - G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, - _("Too large count value passed to g_output_stream_write_async")); + g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, + _("Too large count value passed to %s"), + G_STRFUNC); + g_object_unref (task); return; } if (!g_output_stream_set_pending (stream, &error)) { - g_simple_async_report_gerror_in_idle (G_OBJECT (stream), - callback, - user_data, - error); - g_error_free (error); + g_task_return_error (task, error); + g_object_unref (task); return; } class = G_OUTPUT_STREAM_GET_CLASS (stream); - stream->priv->outstanding_callback = callback; - g_object_ref (stream); class->write_async (stream, buffer, count, io_priority, cancellable, - async_ready_callback_wrapper, user_data); + async_ready_write_callback_wrapper, task); } /** * g_output_stream_write_finish: * @stream: a #GOutputStream. * @result: a #GAsyncResult. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Finishes a stream write operation. @@ -681,32 +855,104 @@ g_output_stream_write_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple; - GOutputStreamClass *class; + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE); - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); - g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1); + /* @result is always the GTask created by g_output_stream_write_async(); + * we called class->write_finish() from async_ready_write_callback_wrapper. + */ + return g_task_propagate_int (G_TASK (result), error); +} - if (G_IS_SIMPLE_ASYNC_RESULT (result)) - { - simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return -1; +static void +write_bytes_callback (GObject *stream, + GAsyncResult *result, + gpointer user_data) +{ + GTask *task = user_data; + GError *error = NULL; + gssize nwrote; - /* Special case writes of 0 bytes */ - if (g_simple_async_result_get_source_tag (simple) == g_output_stream_write_async) - return 0; - } - - class = G_OUTPUT_STREAM_GET_CLASS (stream); - return class->write_finish (stream, result, error); + nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream), + result, &error); + if (nwrote == -1) + g_task_return_error (task, error); + else + g_task_return_int (task, nwrote); + g_object_unref (task); } -typedef struct { - GInputStream *source; - gpointer user_data; - GAsyncReadyCallback callback; -} SpliceUserData; +/** + * g_output_stream_write_bytes_async: + * @stream: A #GOutputStream. + * @bytes: The bytes to write + * @io_priority: the io priority of the request. + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function + * + * This function is similar to g_output_stream_write_async(), but + * takes a #GBytes as input. Due to the refcounted nature of #GBytes, + * this allows the stream to avoid taking a copy of the data. + * + * However, note that this function may still perform partial writes, + * just like g_output_stream_write_async(). If that occurs, to continue + * writing, you will need to create a new #GBytes containing just the + * remaining bytes, using g_bytes_new_from_bytes(). Passing the same + * #GBytes instance multiple times potentially can result in duplicated + * data in the output stream. + * + * For the synchronous, blocking version of this function, see + * g_output_stream_write_bytes(). + **/ +void +g_output_stream_write_bytes_async (GOutputStream *stream, + GBytes *bytes, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GTask *task; + gsize size; + gconstpointer data; + + data = g_bytes_get_data (bytes, &size); + + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_task_data (task, g_bytes_ref (bytes), + (GDestroyNotify) g_bytes_unref); + + g_output_stream_write_async (stream, + data, size, + io_priority, + cancellable, + write_bytes_callback, + task); +} + +/** + * g_output_stream_write_bytes_finish: + * @stream: a #GOutputStream. + * @result: a #GAsyncResult. + * @error: a #GError location to store the error occurring, or %NULL to + * ignore. + * + * Finishes a stream write-from-#GBytes operation. + * + * Returns: a #gssize containing the number of bytes written to the stream. + **/ +gssize +g_output_stream_write_bytes_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); + g_return_val_if_fail (g_task_is_valid (result, stream), -1); + + return g_task_propagate_int (G_TASK (result), error); +} static void async_ready_splice_callback_wrapper (GObject *source_object, @@ -714,16 +960,26 @@ async_ready_splice_callback_wrapper (GObject *source_object, gpointer _data) { GOutputStream *stream = G_OUTPUT_STREAM (source_object); - SpliceUserData *data = _data; - + GOutputStreamClass *class; + GTask *task = _data; + gssize nspliced; + GError *error = NULL; + g_output_stream_clear_pending (stream); - if (data->callback) - (*data->callback) (source_object, res, data->user_data); - - g_object_unref (stream); - g_object_unref (data->source); - g_free (data); + if (g_async_result_legacy_propagate_error (res, &error)) + nspliced = -1; + else + { + class = G_OUTPUT_STREAM_GET_CLASS (stream); + nspliced = class->splice_finish (stream, res, &error); + } + + if (nspliced >= 0) + g_task_return_int (task, nspliced); + else + g_task_return_error (task, error); + g_object_unref (task); } /** @@ -732,12 +988,17 @@ async_ready_splice_callback_wrapper (GObject *source_object, * @source: a #GInputStream. * @flags: a set of #GOutputStreamSpliceFlags. * @io_priority: the io priority of the request. - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @callback: a #GAsyncReadyCallback. - * @user_data: user data passed to @callback. + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): a #GAsyncReadyCallback. + * @user_data: (closure): user data passed to @callback. * * Splices a stream asynchronously. - * + * When the operation is finished @callback will be called. + * You can then call g_output_stream_splice_finish() to get the + * result of the operation. + * + * For the synchronous, blocking version of this function, see + * g_output_stream_splice(). **/ void g_output_stream_splice_async (GOutputStream *stream, @@ -749,87 +1010,111 @@ g_output_stream_splice_async (GOutputStream *stream, gpointer user_data) { GOutputStreamClass *class; - SpliceUserData *data; + GTask *task; GError *error = NULL; g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); g_return_if_fail (G_IS_INPUT_STREAM (source)); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_splice_async); + g_task_set_priority (task, io_priority); + g_task_set_task_data (task, g_object_ref (source), g_object_unref); + if (g_input_stream_is_closed (source)) { - g_simple_async_report_error_in_idle (G_OBJECT (stream), - callback, - user_data, - G_IO_ERROR, G_IO_ERROR_CLOSED, - _("Source stream is already closed")); + g_task_return_new_error (task, + G_IO_ERROR, G_IO_ERROR_CLOSED, + _("Source stream is already closed")); + g_object_unref (task); return; } if (!g_output_stream_set_pending (stream, &error)) { - g_simple_async_report_gerror_in_idle (G_OBJECT (stream), - callback, - user_data, - error); - g_error_free (error); + g_task_return_error (task, error); + g_object_unref (task); return; } class = G_OUTPUT_STREAM_GET_CLASS (stream); - data = g_new0 (SpliceUserData, 1); - data->callback = callback; - data->user_data = user_data; - data->source = g_object_ref (source); - - g_object_ref (stream); class->splice_async (stream, source, flags, io_priority, cancellable, - async_ready_splice_callback_wrapper, data); + async_ready_splice_callback_wrapper, task); } /** * g_output_stream_splice_finish: * @stream: a #GOutputStream. * @result: a #GAsyncResult. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Finishes an asynchronous stream splice operation. * - * Returns: a #gssize of the number of bytes spliced. + * Returns: a #gssize of the number of bytes spliced. Note that if the + * number of bytes spliced is greater than %G_MAXSSIZE, then that + * will be returned, and there is no way to determine the actual + * number of bytes spliced. **/ gssize g_output_stream_splice_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple; - GOutputStreamClass *class; + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE); - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); - g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1); + /* @result is always the GTask created by g_output_stream_splice_async(); + * we called class->splice_finish() from async_ready_splice_callback_wrapper. + */ + return g_task_propagate_int (G_TASK (result), error); +} + +static void +async_ready_flush_callback_wrapper (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GOutputStreamClass *class; + GTask *task = user_data; + gboolean flushed; + GError *error = NULL; - if (G_IS_SIMPLE_ASYNC_RESULT (result)) + g_output_stream_clear_pending (stream); + + if (g_async_result_legacy_propagate_error (res, &error)) + flushed = FALSE; + else { - simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return -1; + class = G_OUTPUT_STREAM_GET_CLASS (stream); + flushed = class->flush_finish (stream, res, &error); } - - class = G_OUTPUT_STREAM_GET_CLASS (stream); - return class->splice_finish (stream, result, error); + + if (flushed) + g_task_return_boolean (task, TRUE); + else + g_task_return_error (task, error); + g_object_unref (task); } /** * g_output_stream_flush_async: * @stream: a #GOutputStream. * @io_priority: the io priority of the request. - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @callback: a #GAsyncReadyCallback to call when the request is satisfied - * @user_data: the data to pass to callback function - * - * Flushes a stream asynchronously. + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function * + * Forces an asynchronous write of all user-space buffered data for + * the given @stream. + * For behaviour details see g_output_stream_flush(). + * + * When the operation is finished @callback will be + * called. You can then call g_output_stream_flush_finish() to get the + * result of the operation. **/ void g_output_stream_flush_async (GOutputStream *stream, @@ -839,90 +1124,152 @@ g_output_stream_flush_async (GOutputStream *stream, gpointer user_data) { GOutputStreamClass *class; - GSimpleAsyncResult *simple; + GTask *task; GError *error = NULL; g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_flush_async); + g_task_set_priority (task, io_priority); + if (!g_output_stream_set_pending (stream, &error)) { - g_simple_async_report_gerror_in_idle (G_OBJECT (stream), - callback, - user_data, - error); - g_error_free (error); + g_task_return_error (task, error); + g_object_unref (task); return; } - stream->priv->outstanding_callback = callback; - g_object_ref (stream); - class = G_OUTPUT_STREAM_GET_CLASS (stream); if (class->flush_async == NULL) { - simple = g_simple_async_result_new (G_OBJECT (stream), - async_ready_callback_wrapper, - user_data, - g_output_stream_flush_async); - g_simple_async_result_complete_in_idle (simple); - g_object_unref (simple); + g_task_return_boolean (task, TRUE); + g_object_unref (task); return; } class->flush_async (stream, io_priority, cancellable, - async_ready_callback_wrapper, user_data); + async_ready_flush_callback_wrapper, task); } /** * g_output_stream_flush_finish: * @stream: a #GOutputStream. * @result: a GAsyncResult. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Finishes flushing an output stream. * - * Returns: %TRUE if flush operation suceeded, %FALSE otherwise. + * Returns: %TRUE if flush operation succeeded, %FALSE otherwise. **/ gboolean g_output_stream_flush_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple; - GOutputStreamClass *klass; - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); - g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE); + + /* @result is always the GTask created by g_output_stream_flush_async(); + * we called class->flush_finish() from async_ready_flush_callback_wrapper. + */ + return g_task_propagate_boolean (G_TASK (result), error); +} + + +static void +async_ready_close_callback_wrapper (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GOutputStreamClass *class; + GTask *task = user_data; + GError *error = g_task_get_task_data (task); + + stream->priv->closing = FALSE; + stream->priv->closed = TRUE; - if (G_IS_SIMPLE_ASYNC_RESULT (result)) + if (!error && !g_async_result_legacy_propagate_error (res, &error)) { - simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return FALSE; + class = G_OUTPUT_STREAM_GET_CLASS (stream); - /* Special case default implementation */ - if (g_simple_async_result_get_source_tag (simple) == g_output_stream_flush_async) - return TRUE; + class->close_finish (stream, res, + error ? NULL : &error); } - klass = G_OUTPUT_STREAM_GET_CLASS (stream); - return klass->flush_finish (stream, result, error); + if (error != NULL) + g_task_return_error (task, error); + else + g_task_return_boolean (task, TRUE); + g_object_unref (task); } +static void +async_ready_close_flushed_callback_wrapper (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GOutputStreamClass *class; + GTask *task = user_data; + GError *error = NULL; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + if (!g_async_result_legacy_propagate_error (res, &error)) + { + class->flush_finish (stream, res, &error); + } + + /* propagate the possible error */ + if (error) + g_task_set_task_data (task, error, NULL); + + /* we still close, even if there was a flush error */ + class->close_async (stream, + g_task_get_priority (task), + g_task_get_cancellable (task), + async_ready_close_callback_wrapper, task); +} + +static void +real_close_async_cb (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GTask *task = user_data; + GError *error = NULL; + gboolean ret; + + g_output_stream_clear_pending (stream); + + ret = g_output_stream_internal_close_finish (stream, res, &error); + + if (error != NULL) + g_task_return_error (task, error); + else + g_task_return_boolean (task, ret); + + g_object_unref (task); +} /** * g_output_stream_close_async: * @stream: A #GOutputStream. * @io_priority: the io priority of the request. - * @callback: callback to call when the request is satisfied - * @user_data: the data to pass to callback function - * @cancellable: optional cancellable object + * @cancellable: (allow-none): optional cancellable object + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function * * Requests an asynchronous close of the stream, releasing resources * related to it. When the operation is finished @callback will be - * called, giving the results. + * called. You can then call g_output_stream_close_finish() to get + * the result of the operation. * * For behaviour details see g_output_stream_close(). * @@ -937,45 +1284,88 @@ g_output_stream_close_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GOutputStreamClass *class; - GSimpleAsyncResult *simple; + GTask *task; GError *error = NULL; g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); - if (stream->priv->closed) + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_close_async); + g_task_set_priority (task, io_priority); + + if (!g_output_stream_set_pending (stream, &error)) { - simple = g_simple_async_result_new (G_OBJECT (stream), - callback, - user_data, - g_output_stream_close_async); - g_simple_async_result_complete_in_idle (simple); - g_object_unref (simple); + g_task_return_error (task, error); + g_object_unref (task); return; } - if (!g_output_stream_set_pending (stream, &error)) + g_output_stream_internal_close_async (stream, io_priority, cancellable, + real_close_async_cb, task); +} + +/* Must always be called inside + * g_output_stream_set_pending()/g_output_stream_clear_pending(). + */ +void +g_output_stream_internal_close_async (GOutputStream *stream, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GOutputStreamClass *class; + GTask *task; + + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_source_tag (task, g_output_stream_internal_close_async); + g_task_set_priority (task, io_priority); + + if (stream->priv->closed) { - g_simple_async_report_gerror_in_idle (G_OBJECT (stream), - callback, - user_data, - error); - g_error_free (error); + g_task_return_boolean (task, TRUE); + g_object_unref (task); return; } - + class = G_OUTPUT_STREAM_GET_CLASS (stream); - stream->priv->outstanding_callback = callback; - g_object_ref (stream); - class->close_async (stream, io_priority, cancellable, - async_ready_close_callback_wrapper, user_data); + stream->priv->closing = TRUE; + + /* Call close_async directly if there is no need to flush, or if the flush + can be done sync (in the output stream async close thread) */ + if (class->flush_async == NULL || + (class->flush_async == g_output_stream_real_flush_async && + (class->flush == NULL || class->close_async == g_output_stream_real_close_async))) + { + class->close_async (stream, io_priority, cancellable, + async_ready_close_callback_wrapper, task); + } + else + { + /* First do an async flush, then do the async close in the callback + wrapper (see async_ready_close_flushed_callback_wrapper) */ + class->flush_async (stream, io_priority, cancellable, + async_ready_close_flushed_callback_wrapper, task); + } +} + +static gboolean +g_output_stream_internal_close_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_internal_close_async), FALSE); + + return g_task_propagate_boolean (G_TASK (result), error); } /** * g_output_stream_close_finish: * @stream: a #GOutputStream. * @result: a #GAsyncResult. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Closes an output stream. @@ -987,25 +1377,14 @@ g_output_stream_close_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple; - GOutputStreamClass *class; - g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); - g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE); - - if (G_IS_SIMPLE_ASYNC_RESULT (result)) - { - simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return FALSE; + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE); - /* Special case already closed */ - if (g_simple_async_result_get_source_tag (simple) == g_output_stream_close_async) - return TRUE; - } - - class = G_OUTPUT_STREAM_GET_CLASS (stream); - return class->close_finish (stream, result, error); + /* @result is always the GTask created by g_output_stream_close_async(); + * we called class->close_finish() from async_ready_close_callback_wrapper. + */ + return g_task_propagate_boolean (G_TASK (result), error); } /** @@ -1025,6 +1404,27 @@ g_output_stream_is_closed (GOutputStream *stream) } /** + * g_output_stream_is_closing: + * @stream: a #GOutputStream. + * + * Checks if an output stream is being closed. This can be + * used inside e.g. a flush implementation to see if the + * flush (or other i/o operation) is called from within + * the closing operation. + * + * Returns: %TRUE if @stream is being closed. %FALSE otherwise. + * + * Since: 2.24 + **/ +gboolean +g_output_stream_is_closing (GOutputStream *stream) +{ + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE); + + return stream->priv->closing; +} + +/** * g_output_stream_has_pending: * @stream: a #GOutputStream. * @@ -1043,14 +1443,14 @@ g_output_stream_has_pending (GOutputStream *stream) /** * g_output_stream_set_pending: * @stream: a #GOutputStream. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Sets @stream to have actions pending. If the pending flag is * already set or @stream is closed, it will return %FALSE and set * @error. * - * Return value: %TRUE if pending was previously unset and is now set. + * Returns: %TRUE if pending was previously unset and is now set. **/ gboolean g_output_stream_set_pending (GOutputStream *stream, @@ -1060,15 +1460,18 @@ g_output_stream_set_pending (GOutputStream *stream, if (stream->priv->closed) { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED, - _("Stream is already closed")); + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, + _("Stream is already closed")); return FALSE; } if (stream->priv->pending) { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING, - _("Stream has outstanding operation")); + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING, + /* Translators: This is an error you get if there is + * already an operation running against this stream when + * you try to start one */ + _("Stream has outstanding operation")); return FALSE; } @@ -1090,6 +1493,28 @@ g_output_stream_clear_pending (GOutputStream *stream) stream->priv->pending = FALSE; } +/** + * g_output_stream_async_write_is_via_threads: + * @stream: a #GOutputStream. + * + * Checks if an ouput stream's write_async function uses threads. + * + * Returns: %TRUE if @stream's write_async function uses threads. + **/ +gboolean +g_output_stream_async_write_is_via_threads (GOutputStream *stream) +{ + GOutputStreamClass *class; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + return (class->write_async == g_output_stream_real_write_async && + !(G_IS_POLLABLE_OUTPUT_STREAM (stream) && + g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))); +} + /******************************************** * Default implementation of async ops * @@ -1102,23 +1527,77 @@ typedef struct { } WriteData; static void -write_async_thread (GSimpleAsyncResult *res, - GObject *object, - GCancellable *cancellable) +free_write_data (WriteData *op) { - WriteData *op; + g_slice_free (WriteData, op); +} + +static void +write_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GOutputStream *stream = source_object; + WriteData *op = task_data; GOutputStreamClass *class; GError *error = NULL; + gssize count_written; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + count_written = class->write_fn (stream, op->buffer, op->count_requested, + cancellable, &error); + if (count_written == -1) + g_task_return_error (task, error); + else + g_task_return_int (task, count_written); +} + +static void write_async_pollable (GPollableOutputStream *stream, + GTask *task); + +static gboolean +write_async_pollable_ready (GPollableOutputStream *stream, + gpointer user_data) +{ + GTask *task = user_data; + + write_async_pollable (stream, task); + return FALSE; +} + +static void +write_async_pollable (GPollableOutputStream *stream, + GTask *task) +{ + GError *error = NULL; + WriteData *op = g_task_get_task_data (task); + gssize count_written; - class = G_OUTPUT_STREAM_GET_CLASS (object); - op = g_simple_async_result_get_op_res_gpointer (res); - op->count_written = class->write_fn (G_OUTPUT_STREAM (object), op->buffer, op->count_requested, - cancellable, &error); - if (op->count_written == -1) + if (g_task_return_error_if_cancelled (task)) + return; + + count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)-> + write_nonblocking (stream, op->buffer, op->count_requested, &error); + + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { - g_simple_async_result_set_from_error (res, error); + GSource *source; + g_error_free (error); + + source = g_pollable_output_stream_create_source (stream, + g_task_get_cancellable (task)); + g_task_attach_source (task, source, + (GSourceFunc) write_async_pollable_ready); + g_source_unref (source); + return; } + + if (count_written == -1) + g_task_return_error (task, error); + else + g_task_return_int (task, count_written); } static void @@ -1130,17 +1609,21 @@ g_output_stream_real_write_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *res; + GTask *task; WriteData *op; - op = g_new0 (WriteData, 1); - res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async); - g_simple_async_result_set_op_res_gpointer (res, op, g_free); + op = g_slice_new0 (WriteData); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_check_cancellable (task, FALSE); + g_task_set_task_data (task, op, (GDestroyNotify) free_write_data); op->buffer = buffer; op->count_requested = count; - - g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable); - g_object_unref (res); + + if (!g_output_stream_async_write_is_via_threads (stream)) + write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task); + else + g_task_run_in_thread (task, write_async_thread); + g_object_unref (task); } static gssize @@ -1148,44 +1631,207 @@ g_output_stream_real_write_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); - WriteData *op; + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_write_async); - op = g_simple_async_result_get_op_res_gpointer (simple); - return op->count_written; + return g_task_propagate_int (G_TASK (result), error); } typedef struct { GInputStream *source; GOutputStreamSpliceFlags flags; - gssize bytes_copied; + gssize n_read; + gssize n_written; + gsize bytes_copied; + GError *error; + guint8 *buffer; } SpliceData; static void -splice_async_thread (GSimpleAsyncResult *result, - GObject *object, - GCancellable *cancellable) +free_splice_data (SpliceData *op) +{ + g_clear_pointer (&op->buffer, g_free); + g_object_unref (op->source); + g_clear_error (&op->error); + g_free (op); +} + +static void +real_splice_async_complete_cb (GTask *task) +{ + SpliceData *op = g_task_get_task_data (task); + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE && + !g_input_stream_is_closed (op->source)) + return; + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET && + !g_output_stream_is_closed (g_task_get_source_object (task))) + return; + + if (op->error != NULL) + { + g_task_return_error (task, op->error); + op->error = NULL; + } + else + { + g_task_return_int (task, op->bytes_copied); + } + + g_object_unref (task); +} + +static void +real_splice_async_close_input_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GTask *task = user_data; + + g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL); + + real_splice_async_complete_cb (task); +} + +static void +real_splice_async_close_output_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GTask *task = G_TASK (user_data); + SpliceData *op = g_task_get_task_data (task); + GError **error = (op->error == NULL) ? &op->error : NULL; + + g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error); + + real_splice_async_complete_cb (task); +} + +static void +real_splice_async_complete (GTask *task) +{ + SpliceData *op = g_task_get_task_data (task); + gboolean done = TRUE; + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE) + { + done = FALSE; + g_input_stream_close_async (op->source, g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_close_input_cb, task); + } + + if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET) + { + done = FALSE; + g_output_stream_internal_close_async (g_task_get_source_object (task), + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_close_output_cb, + task); + } + + if (done) + real_splice_async_complete_cb (task); +} + +static void real_splice_async_read_cb (GObject *source, + GAsyncResult *res, + gpointer user_data); + +static void +real_splice_async_write_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) { - SpliceData *op; GOutputStreamClass *class; - GError *error = NULL; - GOutputStream *stream; + GTask *task = G_TASK (user_data); + SpliceData *op = g_task_get_task_data (task); + gssize ret; - stream = G_OUTPUT_STREAM (object); - class = G_OUTPUT_STREAM_GET_CLASS (object); - op = g_simple_async_result_get_op_res_gpointer (result); - - op->bytes_copied = class->splice (stream, - op->source, - op->flags, - cancellable, - &error); - if (op->bytes_copied == -1) + class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task)); + + ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error); + + if (ret == -1) { - g_simple_async_result_set_from_error (result, error); - g_error_free (error); + real_splice_async_complete (task); + return; + } + + op->n_written += ret; + op->bytes_copied += ret; + if (op->bytes_copied > G_MAXSSIZE) + op->bytes_copied = G_MAXSSIZE; + + if (op->n_written < op->n_read) + { + class->write_async (g_task_get_source_object (task), + op->buffer + op->n_written, + op->n_read - op->n_written, + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_write_cb, task); + return; } + + g_input_stream_read_async (op->source, op->buffer, 8192, + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_read_cb, task); +} + +static void +real_splice_async_read_cb (GObject *source, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStreamClass *class; + GTask *task = G_TASK (user_data); + SpliceData *op = g_task_get_task_data (task); + gssize ret; + + class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task)); + + ret = g_input_stream_read_finish (op->source, res, &op->error); + if (ret == -1 || ret == 0) + { + real_splice_async_complete (task); + return; + } + + op->n_read = ret; + op->n_written = 0; + + class->write_async (g_task_get_source_object (task), op->buffer, + op->n_read, g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_write_cb, task); +} + +static void +splice_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) +{ + GOutputStream *stream = source_object; + SpliceData *op = task_data; + GOutputStreamClass *class; + GError *error = NULL; + gssize bytes_copied; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + bytes_copied = class->splice (stream, + op->source, + op->flags, + cancellable, + &error); + if (bytes_copied == -1) + g_task_return_error (task, error); + else + g_task_return_int (task, bytes_copied); } static void @@ -1197,20 +1843,29 @@ g_output_stream_real_splice_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *res; + GTask *task; SpliceData *op; op = g_new0 (SpliceData, 1); - res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_splice_async); - g_simple_async_result_set_op_res_gpointer (res, op, g_free); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data); op->flags = flags; - op->source = source; + op->source = g_object_ref (source); - /* TODO: In the case where both source and destintion have - non-threadbased async calls we can use a true async copy here */ - - g_simple_async_result_run_in_thread (res, splice_async_thread, io_priority, cancellable); - g_object_unref (res); + if (g_input_stream_async_read_is_via_threads (source) && + g_output_stream_async_write_is_via_threads (stream)) + { + g_task_run_in_thread (task, splice_async_thread); + g_object_unref (task); + } + else + { + op->buffer = g_malloc (8192); + g_input_stream_read_async (op->source, op->buffer, 8192, + g_task_get_priority (task), + g_task_get_cancellable (task), + real_splice_async_read_cb, task); + } } static gssize @@ -1218,34 +1873,32 @@ g_output_stream_real_splice_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); - SpliceData *op; + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_splice_async); - op = g_simple_async_result_get_op_res_gpointer (simple); - return op->bytes_copied; + return g_task_propagate_int (G_TASK (result), error); } static void -flush_async_thread (GSimpleAsyncResult *res, - GObject *object, - GCancellable *cancellable) +flush_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) { + GOutputStream *stream = source_object; GOutputStreamClass *class; gboolean result; GError *error = NULL; - class = G_OUTPUT_STREAM_GET_CLASS (object); + class = G_OUTPUT_STREAM_GET_CLASS (stream); result = TRUE; if (class->flush) - result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error); + result = class->flush (stream, cancellable, &error); - if (!result) - { - g_simple_async_result_set_from_error (res, error); - g_error_free (error); - } + if (result) + g_task_return_boolean (task, TRUE); + else + g_task_return_error (task, error); } static void @@ -1255,12 +1908,12 @@ g_output_stream_real_flush_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *res; + GTask *task; - res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async); - - g_simple_async_result_run_in_thread (res, flush_async_thread, io_priority, cancellable); - g_object_unref (res); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_priority (task, io_priority); + g_task_run_in_thread (task, flush_async_thread); + g_object_unref (task); } static gboolean @@ -1268,30 +1921,52 @@ g_output_stream_real_flush_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - return TRUE; + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); + + return g_task_propagate_boolean (G_TASK (result), error); } static void -close_async_thread (GSimpleAsyncResult *res, - GObject *object, - GCancellable *cancellable) +close_async_thread (GTask *task, + gpointer source_object, + gpointer task_data, + GCancellable *cancellable) { + GOutputStream *stream = source_object; GOutputStreamClass *class; GError *error = NULL; - gboolean result; + gboolean result = TRUE; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + /* Do a flush here if there is a flush function, and we did not have to do + * an async flush before (see g_output_stream_close_async) + */ + if (class->flush != NULL && + (class->flush_async == NULL || + class->flush_async == g_output_stream_real_flush_async)) + { + result = class->flush (stream, cancellable, &error); + } /* Auto handling of cancelation disabled, and ignore cancellation, since we want to close things anyway, although possibly in a quick-n-dirty way. At least we never want to leak open handles */ - - class = G_OUTPUT_STREAM_GET_CLASS (object); - result = class->close_fn (G_OUTPUT_STREAM (object), cancellable, &error); - if (!result) + + if (class->close_fn) { - g_simple_async_result_set_from_error (res, error); - g_error_free (error); + /* Make sure to close, even if the flush failed (see sync close) */ + if (!result) + class->close_fn (stream, cancellable, NULL); + else + result = class->close_fn (stream, cancellable, &error); } + + if (result) + g_task_return_boolean (task, TRUE); + else + g_task_return_error (task, error); } static void @@ -1301,14 +1976,12 @@ g_output_stream_real_close_async (GOutputStream *stream, GAsyncReadyCallback callback, gpointer user_data) { - GSimpleAsyncResult *res; - - res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_close_async); + GTask *task; - g_simple_async_result_set_handle_cancellation (res, FALSE); - - g_simple_async_result_run_in_thread (res, close_async_thread, io_priority, cancellable); - g_object_unref (res); + task = g_task_new (stream, cancellable, callback, user_data); + g_task_set_priority (task, io_priority); + g_task_run_in_thread (task, close_async_thread); + g_object_unref (task); } static gboolean @@ -1316,10 +1989,7 @@ g_output_stream_real_close_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); - g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_close_async); - return TRUE; -} + g_return_val_if_fail (g_task_is_valid (result, stream), FALSE); -#define __G_OUTPUT_STREAM_C__ -#include "gioaliasdef.c" + return g_task_propagate_boolean (G_TASK (result), error); +}