X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gio%2Fgoutputstream.c;h=a619c057e2aa62a8b804074c2619d9ace1afe72c;hb=f8532a13e2054e649f75ca2a58e01604be05549e;hp=62c75a6c82c22dfa9bb0d5e4ee540313900fbf98;hpb=74e95b90d288ba2a5f513b508615869b98c80a66;p=platform%2Fupstream%2Fglib.git diff --git a/gio/goutputstream.c b/gio/goutputstream.c index 62c75a6..a619c05 100644 --- a/gio/goutputstream.c +++ b/gio/goutputstream.c @@ -20,27 +20,37 @@ * Author: Alexander Larsson */ -#include +#include "config.h" #include "goutputstream.h" +#include "gcancellable.h" +#include "gasyncresult.h" #include "gsimpleasyncresult.h" +#include "ginputstream.h" +#include "gioerror.h" #include "glibintl.h" - -#include "gioalias.h" +#include "gpollableoutputstream.h" /** * SECTION:goutputstream * @short_description: Base class for implementing streaming output - * - * + * @include: gio/gio.h + * + * #GOutputStream has functions to write to a stream (g_output_stream_write()), + * to close a stream (g_output_stream_close()) and to flush pending writes + * (g_output_stream_flush()). + * + * To copy the content of an input stream to an output stream without + * manually handling the reads and writes, use g_output_stream_splice(). * + * All of these functions have async variants too. **/ -G_DEFINE_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT); +G_DEFINE_ABSTRACT_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT); struct _GOutputStreamPrivate { guint closed : 1; guint pending : 1; - guint cancelled : 1; + guint closing : 1; GAsyncReadyCallback outstanding_callback; }; @@ -85,16 +95,14 @@ static void g_output_stream_real_close_async (GOutputStream *s static gboolean g_output_stream_real_close_finish (GOutputStream *stream, GAsyncResult *result, GError **error); +static gboolean _g_output_stream_close_internal (GOutputStream *stream, + GCancellable *cancellable, + GError **error); static void g_output_stream_finalize (GObject *object) { - GOutputStream *stream; - - stream = G_OUTPUT_STREAM (object); - - if (G_OBJECT_CLASS (g_output_stream_parent_class)->finalize) - (*G_OBJECT_CLASS (g_output_stream_parent_class)->finalize) (object); + G_OBJECT_CLASS (g_output_stream_parent_class)->finalize (object); } static void @@ -106,9 +114,8 @@ g_output_stream_dispose (GObject *object) if (!stream->priv->closed) g_output_stream_close (stream, NULL, NULL); - - if (G_OBJECT_CLASS (g_output_stream_parent_class)->dispose) - (*G_OBJECT_CLASS (g_output_stream_parent_class)->dispose) (object); + + G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object); } static void @@ -144,31 +151,34 @@ g_output_stream_init (GOutputStream *stream) /** * g_output_stream_write: * @stream: a #GOutputStream. - * @buffer: the buffer containing the data to write. + * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. * @count: the number of bytes to write - * @cancellable: optional cancellable object - * @error: location to store the error occuring, or %NULL to ignore + * @cancellable: (allow-none): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore * * Tries to write @count bytes from @buffer into the stream. Will block * during the operation. * - * If count is zero returns zero and does nothing. A value of @count + * If count is 0, returns 0 and does nothing. A value of @count * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error. * * On success, the number of bytes written to the stream is returned. * It is not an error if this is not the same as the requested size, as it - * can happen e.g. on a partial i/o error, or if the there is not enough - * storage in the stream. All writes either block until at least one byte - * is written, so zero is never returned (unless @count is zero). + * can happen e.g. on a partial I/O error, or if there is not enough + * storage in the stream. All writes block until at least one byte + * is written or an error occurs; 0 is never returned (unless + * @count is 0). * - * If @cancellable is not NULL, then the operation can be cancelled by + * If @cancellable is not %NULL, then the operation can be cancelled by * triggering the cancellable object from another thread. If the operation - * was cancelled, the error G_IO_ERROR_CANCELLED will be returned. If an + * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an * operation was partially finished when the operation was cancelled the * partial result will be returned, without an error. * * On error -1 is returned and @error is set accordingly. * + * Virtual: write_fn + * * Return value: Number of bytes written, or -1 on error **/ gssize @@ -190,16 +200,16 @@ g_output_stream_write (GOutputStream *stream, if (((gssize) count) < 0) { g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, - _("Too large count value passed to g_output_stream_write")); + _("Too large count value passed to %s"), G_STRFUNC); return -1; } class = G_OUTPUT_STREAM_GET_CLASS (stream); - if (class->write == NULL) + if (class->write_fn == NULL) { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, - _("Output stream doesn't implement write")); + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, + _("Output stream doesn't implement write")); return -1; } @@ -207,12 +217,12 @@ g_output_stream_write (GOutputStream *stream, return -1; if (cancellable) - g_push_current_cancellable (cancellable); + g_cancellable_push_current (cancellable); - res = class->write (stream, buffer, count, cancellable, error); + res = class->write_fn (stream, buffer, count, cancellable, error); if (cancellable) - g_pop_current_cancellable (cancellable); + g_cancellable_pop_current (cancellable); g_output_stream_clear_pending (stream); @@ -222,12 +232,12 @@ g_output_stream_write (GOutputStream *stream, /** * g_output_stream_write_all: * @stream: a #GOutputStream. - * @buffer: the buffer containing the data to write. + * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. * @count: the number of bytes to write - * @bytes_written: location to store the number of bytes that was + * @bytes_written: (out): location to store the number of bytes that was * written to the stream - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @error: location to store the error occuring, or %NULL to ignore + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @error: location to store the error occurring, or %NULL to ignore * * Tries to write @count bytes from @buffer into the stream. Will block * during the operation. @@ -238,9 +248,9 @@ g_output_stream_write (GOutputStream *stream, * On a successful write of @count bytes, %TRUE is returned, and @bytes_written * is set to @count. * - * If there is an error during the operation FALSE is returned and @error + * If there is an error during the operation %FALSE is returned and @error * is set to indicate the error status, @bytes_written is updated to contain - * the number of bytes written into the stream before the error occured. + * the number of bytes written into the stream before the error occurred. * * Return value: %TRUE on success, %FALSE if there was an error **/ @@ -283,13 +293,61 @@ g_output_stream_write_all (GOutputStream *stream, } /** + * g_output_stream_write_bytes: + * @stream: a #GOutputStream. + * @bytes: the #GBytes to write + * @cancellable: (allow-none): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore + * + * Tries to write the data from @bytes into the stream. Will block + * during the operation. + * + * If @bytes is 0-length, returns 0 and does nothing. A #GBytes larger + * than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error. + * + * On success, the number of bytes written to the stream is returned. + * It is not an error if this is not the same as the requested size, as it + * can happen e.g. on a partial I/O error, or if there is not enough + * storage in the stream. All writes block until at least one byte + * is written or an error occurs; 0 is never returned (unless + * the size of @bytes is 0). + * + * If @cancellable is not %NULL, then the operation can be cancelled by + * triggering the cancellable object from another thread. If the operation + * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an + * operation was partially finished when the operation was cancelled the + * partial result will be returned, without an error. + * + * On error -1 is returned and @error is set accordingly. + * + * Return value: Number of bytes written, or -1 on error + **/ +gssize +g_output_stream_write_bytes (GOutputStream *stream, + GBytes *bytes, + GCancellable *cancellable, + GError **error) +{ + gsize size; + gconstpointer data; + + data = g_bytes_get_data (bytes, &size); + + return g_output_stream_write (stream, + data, size, + cancellable, + error); +} + +/** * g_output_stream_flush: * @stream: a #GOutputStream. - * @cancellable: optional cancellable object - * @error: location to store the error occuring, or %NULL to ignore + * @cancellable: (allow-none): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore * - * Flushed any outstanding buffers in the stream. Will block during - * the operation. Closing the stream will implicitly cause a flush. + * Forces a write of all user-space buffered data for the given + * @stream. Will block during the operation. Closing the stream will + * implicitly cause a flush. * * This function is optional for inherited classes. * @@ -318,12 +376,12 @@ g_output_stream_flush (GOutputStream *stream, if (class->flush) { if (cancellable) - g_push_current_cancellable (cancellable); + g_cancellable_push_current (cancellable); res = class->flush (stream, cancellable, error); if (cancellable) - g_pop_current_cancellable (cancellable); + g_cancellable_pop_current (cancellable); } g_output_stream_clear_pending (stream); @@ -336,13 +394,17 @@ g_output_stream_flush (GOutputStream *stream, * @stream: a #GOutputStream. * @source: a #GInputStream. * @flags: a set of #GOutputStreamSpliceFlags. - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @error: a #GError location to store the error occuring, or %NULL to + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Splices an input stream into an output stream. * - * Returns: a #gssize containig the size of the data spliced. + * Returns: a #gssize containing the size of the data spliced, or + * -1 if an error occurred. Note that if the number of bytes + * spliced is greater than %G_MAXSSIZE, then that will be + * returned, and there is no way to determine the actual number + * of bytes spliced. **/ gssize g_output_stream_splice (GOutputStream *stream, @@ -352,35 +414,34 @@ g_output_stream_splice (GOutputStream *stream, GError **error) { GOutputStreamClass *class; - gboolean res; + gssize bytes_copied; g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1); if (g_input_stream_is_closed (source)) { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED, - _("Source stream is already closed")); + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, + _("Source stream is already closed")); return -1; } if (!g_output_stream_set_pending (stream, error)) return -1; - + class = G_OUTPUT_STREAM_GET_CLASS (stream); - res = TRUE; if (cancellable) - g_push_current_cancellable (cancellable); - - res = class->splice (stream, source, flags, cancellable, error); - + g_cancellable_push_current (cancellable); + + bytes_copied = class->splice (stream, source, flags, cancellable, error); + if (cancellable) - g_pop_current_cancellable (cancellable); - + g_cancellable_pop_current (cancellable); + g_output_stream_clear_pending (stream); - return res; + return bytes_copied; } static gssize @@ -392,21 +453,21 @@ g_output_stream_real_splice (GOutputStream *stream, { GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream); gssize n_read, n_written; - gssize bytes_copied; + gsize bytes_copied; char buffer[8192], *p; gboolean res; - if (class->write == NULL) + bytes_copied = 0; + if (class->write_fn == NULL) { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, - _("Output stream doesn't implement write")); + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED, + _("Output stream doesn't implement write")); res = FALSE; goto notsupported; } - - bytes_copied = 0; + res = TRUE; - do + do { n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error); if (n_read == -1) @@ -414,14 +475,14 @@ g_output_stream_real_splice (GOutputStream *stream, res = FALSE; break; } - + if (n_read == 0) break; p = buffer; while (n_read > 0) { - n_written = class->write (stream, p, n_read, cancellable, error); + n_written = class->write_fn (stream, p, n_read, cancellable, error); if (n_written == -1) { res = FALSE; @@ -432,6 +493,9 @@ g_output_stream_real_splice (GOutputStream *stream, n_read -= n_written; bytes_copied += n_written; } + + if (bytes_copied > G_MAXSSIZE) + bytes_copied = G_MAXSSIZE; } while (res); @@ -439,31 +503,78 @@ g_output_stream_real_splice (GOutputStream *stream, if (!res) error = NULL; /* Ignore further errors */ - if (flags & G_OUTPUT_STREAM_SPLICE_FLAGS_CLOSE_SOURCE) + if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE) { /* Don't care about errors in source here */ g_input_stream_close (source, cancellable, NULL); } - if (flags & G_OUTPUT_STREAM_SPLICE_FLAGS_CLOSE_TARGET) + if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET) { /* But write errors on close are bad! */ - if (!class->close (stream, cancellable, error)) - res = FALSE; + res = _g_output_stream_close_internal (stream, cancellable, error); } if (res) return bytes_copied; - + return -1; } +/* Must always be called inside + * g_output_stream_set_pending()/g_output_stream_clear_pending(). */ +static gboolean +_g_output_stream_close_internal (GOutputStream *stream, + GCancellable *cancellable, + GError **error) +{ + GOutputStreamClass *class; + gboolean res; + + if (stream->priv->closed) + return TRUE; + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + stream->priv->closing = TRUE; + + if (cancellable) + g_cancellable_push_current (cancellable); + + if (class->flush) + res = class->flush (stream, cancellable, error); + else + res = TRUE; + + if (!res) + { + /* flushing caused the error that we want to return, + * but we still want to close the underlying stream if possible + */ + if (class->close_fn) + class->close_fn (stream, cancellable, NULL); + } + else + { + res = TRUE; + if (class->close_fn) + res = class->close_fn (stream, cancellable, error); + } + + if (cancellable) + g_cancellable_pop_current (cancellable); + + stream->priv->closing = FALSE; + stream->priv->closed = TRUE; + + return res; +} /** * g_output_stream_close: * @stream: A #GOutputStream. - * @cancellable: optional cancellable object - * @error: location to store the error occuring, or %NULL to ignore + * @cancellable: (allow-none): optional cancellable object + * @error: location to store the error occurring, or %NULL to ignore * * Closes the stream, releasing resources related to it. * @@ -474,8 +585,8 @@ g_output_stream_real_splice (GOutputStream *stream, * stream. * * Streams will be automatically closed when the last reference - * is dropped, but you might want to call make sure resources - * are released as early as possible. + * is dropped, but you might want to call this function to make sure + * resources are released as early as possible. * * Some streams might keep the backing store of the stream (e.g. a file descriptor) * open after the stream is closed. See the documentation for the individual @@ -483,11 +594,11 @@ g_output_stream_real_splice (GOutputStream *stream, * * On failure the first error that happened will be reported, but the close * operation will finish as much as possible. A stream that failed to - * close will still return %G_IO_ERROR_CLOSED all operations. Still, it + * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it * is important to check and report the error to the user, otherwise * there might be a loss of data as all data might not be written. * - * If @cancellable is not NULL, then the operation can be cancelled by + * If @cancellable is not %NULL, then the operation can be cancelled by * triggering the cancellable object from another thread. If the operation * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. * Cancelling a close will still leave the stream closed, but there some streams @@ -502,46 +613,18 @@ g_output_stream_close (GOutputStream *stream, GCancellable *cancellable, GError **error) { - GOutputStreamClass *class; gboolean res; g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); - class = G_OUTPUT_STREAM_GET_CLASS (stream); - if (stream->priv->closed) return TRUE; if (!g_output_stream_set_pending (stream, error)) return FALSE; - if (cancellable) - g_push_current_cancellable (cancellable); + res = _g_output_stream_close_internal (stream, cancellable, error); - if (class->flush) - res = class->flush (stream, cancellable, error); - else - res = TRUE; - - if (!res) - { - /* flushing caused the error that we want to return, - * but we still want to close the underlying stream if possible - */ - if (class->close) - class->close (stream, cancellable, NULL); - } - else - { - res = TRUE; - if (class->close) - res = class->close (stream, cancellable, error); - } - - if (cancellable) - g_pop_current_cancellable (cancellable); - - stream->priv->closed = TRUE; g_output_stream_clear_pending (stream); return res; @@ -560,33 +643,100 @@ async_ready_callback_wrapper (GObject *source_object, g_object_unref (stream); } +typedef struct { + gint io_priority; + GCancellable *cancellable; + GError *flush_error; + gpointer user_data; +} CloseUserData; + static void async_ready_close_callback_wrapper (GObject *source_object, GAsyncResult *res, gpointer user_data) { GOutputStream *stream = G_OUTPUT_STREAM (source_object); + CloseUserData *data = user_data; + stream->priv->closing = FALSE; stream->priv->closed = TRUE; + g_output_stream_clear_pending (stream); + if (stream->priv->outstanding_callback) - (*stream->priv->outstanding_callback) (source_object, res, user_data); + { + if (data->flush_error != NULL) + { + GSimpleAsyncResult *err; + + err = g_simple_async_result_new_take_error (source_object, + stream->priv->outstanding_callback, + data->user_data, + data->flush_error); + data->flush_error = NULL; + + (*stream->priv->outstanding_callback) (source_object, + G_ASYNC_RESULT (err), + data->user_data); + g_object_unref (err); + } + else + { + (*stream->priv->outstanding_callback) (source_object, + res, + data->user_data); + } + } + g_object_unref (stream); + + if (data->cancellable) + g_object_unref (data->cancellable); + + if (data->flush_error) + g_error_free (data->flush_error); + + g_slice_free (CloseUserData, data); +} + +static void +async_ready_close_flushed_callback_wrapper (GObject *source_object, + GAsyncResult *res, + gpointer user_data) +{ + GOutputStream *stream = G_OUTPUT_STREAM (source_object); + GOutputStreamClass *class; + CloseUserData *data = user_data; + GSimpleAsyncResult *simple; + + /* propagate the possible error */ + if (G_IS_SIMPLE_ASYNC_RESULT (res)) + { + simple = G_SIMPLE_ASYNC_RESULT (res); + g_simple_async_result_propagate_error (simple, &data->flush_error); + } + + class = G_OUTPUT_STREAM_GET_CLASS (stream); + + /* we still close, even if there was a flush error */ + class->close_async (stream, data->io_priority, data->cancellable, + async_ready_close_callback_wrapper, user_data); } /** * g_output_stream_write_async: * @stream: A #GOutputStream. - * @buffer: the buffer containing the data to write. + * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. * @count: the number of bytes to write * @io_priority: the io priority of the request. - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @callback: callback to call when the request is satisfied - * @user_data: the data to pass to callback function + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function * * Request an asynchronous write of @count bytes from @buffer into - * the stream. When the operation is finished @callback will be called, - * giving the results. + * the stream. When the operation is finished @callback will be called. + * You can then call g_output_stream_write_finish() to get the result of the + * operation. * * During an async request no other sync and async calls are allowed, * and will result in %G_IO_ERROR_PENDING errors. @@ -599,6 +749,10 @@ async_ready_close_callback_wrapper (GObject *source_object, * requested size, as it can happen e.g. on a partial I/O error, * but generally we try to write as many bytes as requested. * + * You are guaranteed that this method will never fail with + * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the + * method will just wait until this changes. + * * Any outstanding I/O request with higher priority (lower numerical * value) will be executed before an outstanding request with lower * priority. Default priority is %G_PRIORITY_DEFAULT. @@ -643,17 +797,17 @@ g_output_stream_write_async (GOutputStream *stream, callback, user_data, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, - _("Too large count value passed to g_output_stream_write_async")); + _("Too large count value passed to %s"), + G_STRFUNC); return; } if (!g_output_stream_set_pending (stream, &error)) { - g_simple_async_report_gerror_in_idle (G_OBJECT (stream), + g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream), callback, user_data, error); - g_error_free (error); return; } @@ -669,7 +823,7 @@ g_output_stream_write_async (GOutputStream *stream, * g_output_stream_write_finish: * @stream: a #GOutputStream. * @result: a #GAsyncResult. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Finishes a stream write operation. @@ -687,11 +841,12 @@ g_output_stream_write_finish (GOutputStream *stream, g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1); + if (g_async_result_legacy_propagate_error (result, error)) + return -1; + if (G_IS_SIMPLE_ASYNC_RESULT (result)) { simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return -1; /* Special case writes of 0 bytes */ if (g_simple_async_result_get_source_tag (simple) == g_output_stream_write_async) @@ -702,6 +857,116 @@ g_output_stream_write_finish (GOutputStream *stream, return class->write_finish (stream, result, error); } +static void +write_bytes_callback (GObject *stream, + GAsyncResult *result, + gpointer user_data) +{ + GSimpleAsyncResult *simple = user_data; + GError *error = NULL; + gssize nwrote; + + nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream), + result, &error); + if (nwrote == -1) + g_simple_async_result_take_error (simple, error); + else + g_simple_async_result_set_op_res_gssize (simple, nwrote); + g_simple_async_result_complete (simple); + g_object_unref (simple); +} + +/** + * g_output_stream_write_bytes_async: + * @stream: A #GOutputStream. + * @bytes: The bytes to write + * @io_priority: the io priority of the request. + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function + * + * Request an asynchronous write of the data in @bytes to the stream. + * When the operation is finished @callback will be called. You can + * then call g_output_stream_write_bytes_finish() to get the result of + * the operation. + * + * During an async request no other sync and async calls are allowed, + * and will result in %G_IO_ERROR_PENDING errors. + * + * A #GBytes larger than %G_MAXSSIZE will cause a + * %G_IO_ERROR_INVALID_ARGUMENT error. + * + * On success, the number of bytes written will be passed to the + * @callback. It is not an error if this is not the same as the + * requested size, as it can happen e.g. on a partial I/O error, + * but generally we try to write as many bytes as requested. + * + * You are guaranteed that this method will never fail with + * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the + * method will just wait until this changes. + * + * Any outstanding I/O request with higher priority (lower numerical + * value) will be executed before an outstanding request with lower + * priority. Default priority is %G_PRIORITY_DEFAULT. + * + * For the synchronous, blocking version of this function, see + * g_output_stream_write_bytes(). + **/ +void +g_output_stream_write_bytes_async (GOutputStream *stream, + GBytes *bytes, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GSimpleAsyncResult *simple; + gsize size; + gconstpointer data; + + data = g_bytes_get_data (bytes, &size); + + simple = g_simple_async_result_new (G_OBJECT (stream), + callback, user_data, + g_output_stream_write_bytes_async); + g_simple_async_result_set_op_res_gpointer (simple, g_bytes_ref (bytes), + (GDestroyNotify) g_bytes_unref); + + g_output_stream_write_async (stream, + data, size, + io_priority, + cancellable, + write_bytes_callback, + simple); +} + +/** + * g_output_stream_write_bytes_finish: + * @stream: a #GOutputStream. + * @result: a #GAsyncResult. + * @error: a #GError location to store the error occurring, or %NULL to + * ignore. + * + * Finishes a stream write-from-#GBytes operation. + * + * Returns: a #gssize containing the number of bytes written to the stream. + **/ +gssize +g_output_stream_write_bytes_finish (GOutputStream *stream, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); + g_return_val_if_fail (g_simple_async_result_is_valid (result, G_OBJECT (stream), g_output_stream_write_bytes_async), -1); + + simple = G_SIMPLE_ASYNC_RESULT (result); + if (g_simple_async_result_propagate_error (simple, error)) + return -1; + return g_simple_async_result_get_op_res_gssize (simple); +} + typedef struct { GInputStream *source; gpointer user_data; @@ -732,12 +997,17 @@ async_ready_splice_callback_wrapper (GObject *source_object, * @source: a #GInputStream. * @flags: a set of #GOutputStreamSpliceFlags. * @io_priority: the io priority of the request. - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @callback: a #GAsyncReadyCallback. - * @user_data: user data passed to @callback. + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): a #GAsyncReadyCallback. + * @user_data: (closure): user data passed to @callback. * * Splices a stream asynchronously. - * + * When the operation is finished @callback will be called. + * You can then call g_output_stream_splice_finish() to get the + * result of the operation. + * + * For the synchronous, blocking version of this function, see + * g_output_stream_splice(). **/ void g_output_stream_splice_async (GOutputStream *stream, @@ -767,11 +1037,10 @@ g_output_stream_splice_async (GOutputStream *stream, if (!g_output_stream_set_pending (stream, &error)) { - g_simple_async_report_gerror_in_idle (G_OBJECT (stream), + g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream), callback, user_data, error); - g_error_free (error); return; } @@ -791,30 +1060,28 @@ g_output_stream_splice_async (GOutputStream *stream, * g_output_stream_splice_finish: * @stream: a #GOutputStream. * @result: a #GAsyncResult. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Finishes an asynchronous stream splice operation. * - * Returns: a #gssize of the number of bytes spliced. + * Returns: a #gssize of the number of bytes spliced. Note that if the + * number of bytes spliced is greater than %G_MAXSSIZE, then that + * will be returned, and there is no way to determine the actual + * number of bytes spliced. **/ gssize g_output_stream_splice_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { - GSimpleAsyncResult *simple; GOutputStreamClass *class; g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1); g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1); - if (G_IS_SIMPLE_ASYNC_RESULT (result)) - { - simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return -1; - } + if (g_async_result_legacy_propagate_error (result, error)) + return -1; class = G_OUTPUT_STREAM_GET_CLASS (stream); return class->splice_finish (stream, result, error); @@ -824,12 +1091,17 @@ g_output_stream_splice_finish (GOutputStream *stream, * g_output_stream_flush_async: * @stream: a #GOutputStream. * @io_priority: the io priority of the request. - * @cancellable: optional #GCancellable object, %NULL to ignore. - * @callback: a #GAsyncReadyCallback to call when the request is satisfied - * @user_data: the data to pass to callback function - * - * Flushes a stream asynchronously. + * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. + * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function * + * Forces an asynchronous write of all user-space buffered data for + * the given @stream. + * For behaviour details see g_output_stream_flush(). + * + * When the operation is finished @callback will be + * called. You can then call g_output_stream_flush_finish() to get the + * result of the operation. **/ void g_output_stream_flush_async (GOutputStream *stream, @@ -846,11 +1118,10 @@ g_output_stream_flush_async (GOutputStream *stream, if (!g_output_stream_set_pending (stream, &error)) { - g_simple_async_report_gerror_in_idle (G_OBJECT (stream), + g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream), callback, user_data, error); - g_error_free (error); return; } @@ -878,12 +1149,12 @@ g_output_stream_flush_async (GOutputStream *stream, * g_output_stream_flush_finish: * @stream: a #GOutputStream. * @result: a GAsyncResult. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Finishes flushing an output stream. * - * Returns: %TRUE if flush operation suceeded, %FALSE otherwise. + * Returns: %TRUE if flush operation succeeded, %FALSE otherwise. **/ gboolean g_output_stream_flush_finish (GOutputStream *stream, @@ -896,11 +1167,12 @@ g_output_stream_flush_finish (GOutputStream *stream, g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE); + if (g_async_result_legacy_propagate_error (result, error)) + return FALSE; + if (G_IS_SIMPLE_ASYNC_RESULT (result)) { simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return FALSE; /* Special case default implementation */ if (g_simple_async_result_get_source_tag (simple) == g_output_stream_flush_async) @@ -916,13 +1188,14 @@ g_output_stream_flush_finish (GOutputStream *stream, * g_output_stream_close_async: * @stream: A #GOutputStream. * @io_priority: the io priority of the request. - * @callback: callback to call when the request is satisfied - * @user_data: the data to pass to callback function - * @cancellable: optional cancellable object + * @cancellable: (allow-none): optional cancellable object + * @callback: (scope async): callback to call when the request is satisfied + * @user_data: (closure): the data to pass to callback function * * Requests an asynchronous close of the stream, releasing resources * related to it. When the operation is finished @callback will be - * called, giving the results. + * called. You can then call g_output_stream_close_finish() to get + * the result of the operation. * * For behaviour details see g_output_stream_close(). * @@ -940,6 +1213,7 @@ g_output_stream_close_async (GOutputStream *stream, GOutputStreamClass *class; GSimpleAsyncResult *simple; GError *error = NULL; + CloseUserData *data; g_return_if_fail (G_IS_OUTPUT_STREAM (stream)); @@ -956,26 +1230,49 @@ g_output_stream_close_async (GOutputStream *stream, if (!g_output_stream_set_pending (stream, &error)) { - g_simple_async_report_gerror_in_idle (G_OBJECT (stream), + g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream), callback, user_data, error); - g_error_free (error); return; } class = G_OUTPUT_STREAM_GET_CLASS (stream); + stream->priv->closing = TRUE; stream->priv->outstanding_callback = callback; g_object_ref (stream); - class->close_async (stream, io_priority, cancellable, - async_ready_close_callback_wrapper, user_data); + + data = g_slice_new0 (CloseUserData); + + if (cancellable != NULL) + data->cancellable = g_object_ref (cancellable); + + data->io_priority = io_priority; + data->user_data = user_data; + + /* Call close_async directly if there is no need to flush, or if the flush + can be done sync (in the output stream async close thread) */ + if (class->flush_async == NULL || + (class->flush_async == g_output_stream_real_flush_async && + (class->flush == NULL || class->close_async == g_output_stream_real_close_async))) + { + class->close_async (stream, io_priority, cancellable, + async_ready_close_callback_wrapper, data); + } + else + { + /* First do an async flush, then do the async close in the callback + wrapper (see async_ready_close_flushed_callback_wrapper) */ + class->flush_async (stream, io_priority, cancellable, + async_ready_close_flushed_callback_wrapper, data); + } } /** * g_output_stream_close_finish: * @stream: a #GOutputStream. * @result: a #GAsyncResult. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Closes an output stream. @@ -993,11 +1290,12 @@ g_output_stream_close_finish (GOutputStream *stream, g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE); g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE); + if (g_async_result_legacy_propagate_error (result, error)) + return FALSE; + if (G_IS_SIMPLE_ASYNC_RESULT (result)) { simple = G_SIMPLE_ASYNC_RESULT (result); - if (g_simple_async_result_propagate_error (simple, error)) - return FALSE; /* Special case already closed */ if (g_simple_async_result_get_source_tag (simple) == g_output_stream_close_async) @@ -1025,6 +1323,27 @@ g_output_stream_is_closed (GOutputStream *stream) } /** + * g_output_stream_is_closing: + * @stream: a #GOutputStream. + * + * Checks if an output stream is being closed. This can be + * used inside e.g. a flush implementation to see if the + * flush (or other i/o operation) is called from within + * the closing operation. + * + * Returns: %TRUE if @stream is being closed. %FALSE otherwise. + * + * Since: 2.24 + **/ +gboolean +g_output_stream_is_closing (GOutputStream *stream) +{ + g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE); + + return stream->priv->closing; +} + +/** * g_output_stream_has_pending: * @stream: a #GOutputStream. * @@ -1043,7 +1362,7 @@ g_output_stream_has_pending (GOutputStream *stream) /** * g_output_stream_set_pending: * @stream: a #GOutputStream. - * @error: a #GError location to store the error occuring, or %NULL to + * @error: a #GError location to store the error occurring, or %NULL to * ignore. * * Sets @stream to have actions pending. If the pending flag is @@ -1060,15 +1379,18 @@ g_output_stream_set_pending (GOutputStream *stream, if (stream->priv->closed) { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED, - _("Stream is already closed")); + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, + _("Stream is already closed")); return FALSE; } if (stream->priv->pending) { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING, - _("Stream has outstanding operation")); + g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING, + /* Translators: This is an error you get if there is + * already an operation running against this stream when + * you try to start one */ + _("Stream has outstanding operation")); return FALSE; } @@ -1099,9 +1421,21 @@ typedef struct { const void *buffer; gsize count_requested; gssize count_written; + + GCancellable *cancellable; + gint io_priority; + gboolean need_idle; } WriteData; static void +free_write_data (WriteData *op) +{ + if (op->cancellable) + g_object_unref (op->cancellable); + g_slice_free (WriteData, op); +} + +static void write_async_thread (GSimpleAsyncResult *res, GObject *object, GCancellable *cancellable) @@ -1112,13 +1446,64 @@ write_async_thread (GSimpleAsyncResult *res, class = G_OUTPUT_STREAM_GET_CLASS (object); op = g_simple_async_result_get_op_res_gpointer (res); - op->count_written = class->write (G_OUTPUT_STREAM (object), op->buffer, op->count_requested, - cancellable, &error); + op->count_written = class->write_fn (G_OUTPUT_STREAM (object), op->buffer, op->count_requested, + cancellable, &error); if (op->count_written == -1) + g_simple_async_result_take_error (res, error); +} + +static void write_async_pollable (GPollableOutputStream *stream, + GSimpleAsyncResult *result); + +static gboolean +write_async_pollable_ready (GPollableOutputStream *stream, + gpointer user_data) +{ + GSimpleAsyncResult *result = user_data; + + write_async_pollable (stream, result); + return FALSE; +} + +static void +write_async_pollable (GPollableOutputStream *stream, + GSimpleAsyncResult *result) +{ + GError *error = NULL; + WriteData *op = g_simple_async_result_get_op_res_gpointer (result); + + if (g_cancellable_set_error_if_cancelled (op->cancellable, &error)) + op->count_written = -1; + else + { + op->count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)-> + write_nonblocking (stream, op->buffer, op->count_requested, &error); + } + + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { - g_simple_async_result_set_from_error (res, error); + GSource *source; + g_error_free (error); + op->need_idle = FALSE; + + source = g_pollable_output_stream_create_source (stream, op->cancellable); + g_source_set_callback (source, + (GSourceFunc) write_async_pollable_ready, + g_object_ref (result), g_object_unref); + g_source_set_priority (source, op->io_priority); + g_source_attach (source, g_main_context_get_thread_default ()); + g_source_unref (source); + return; } + + if (op->count_written == -1) + g_simple_async_result_take_error (result, error); + + if (op->need_idle) + g_simple_async_result_complete_in_idle (result); + else + g_simple_async_result_complete (result); } static void @@ -1133,13 +1518,20 @@ g_output_stream_real_write_async (GOutputStream *stream, GSimpleAsyncResult *res; WriteData *op; - op = g_new0 (WriteData, 1); + op = g_slice_new0 (WriteData); res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async); - g_simple_async_result_set_op_res_gpointer (res, op, g_free); + g_simple_async_result_set_op_res_gpointer (res, op, (GDestroyNotify) free_write_data); op->buffer = buffer; op->count_requested = count; + op->cancellable = cancellable ? g_object_ref (cancellable) : NULL; + op->io_priority = io_priority; + op->need_idle = TRUE; - g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable); + if (G_IS_POLLABLE_OUTPUT_STREAM (stream) && + g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))) + write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), res); + else + g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable); g_object_unref (res); } @@ -1151,7 +1543,11 @@ g_output_stream_real_write_finish (GOutputStream *stream, GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); WriteData *op; - g_assert (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_write_async); + g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_write_async); + + if (g_simple_async_result_propagate_error (simple, error)) + return -1; + op = g_simple_async_result_get_op_res_gpointer (simple); return op->count_written; } @@ -1182,10 +1578,7 @@ splice_async_thread (GSimpleAsyncResult *result, cancellable, &error); if (op->bytes_copied == -1) - { - g_simple_async_result_set_from_error (result, error); - g_error_free (error); - } + g_simple_async_result_take_error (result, error); } static void @@ -1221,7 +1614,11 @@ g_output_stream_real_splice_finish (GOutputStream *stream, GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); SpliceData *op; - g_assert (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_splice_async); + g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_splice_async); + + if (g_simple_async_result_propagate_error (simple, error)) + return -1; + op = g_simple_async_result_get_op_res_gpointer (simple); return op->bytes_copied; } @@ -1242,10 +1639,7 @@ flush_async_thread (GSimpleAsyncResult *res, result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error); if (!result) - { - g_simple_async_result_set_from_error (res, error); - g_error_free (error); - } + g_simple_async_result_take_error (res, error); } static void @@ -1268,6 +1662,10 @@ g_output_stream_real_flush_finish (GOutputStream *stream, GAsyncResult *result, GError **error) { + GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); + + if (g_simple_async_result_propagate_error (simple, error)) + return FALSE; return TRUE; } @@ -1278,19 +1676,34 @@ close_async_thread (GSimpleAsyncResult *res, { GOutputStreamClass *class; GError *error = NULL; - gboolean result; + gboolean result = TRUE; + + class = G_OUTPUT_STREAM_GET_CLASS (object); + + /* Do a flush here if there is a flush function, and we did not have to do + an async flush before (see g_output_stream_close_async) */ + if (class->flush != NULL && + (class->flush_async == NULL || + class->flush_async == g_output_stream_real_flush_async)) + { + result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error); + } /* Auto handling of cancelation disabled, and ignore cancellation, since we want to close things anyway, although possibly in a quick-n-dirty way. At least we never want to leak open handles */ - class = G_OUTPUT_STREAM_GET_CLASS (object); - result = class->close (G_OUTPUT_STREAM (object), cancellable, &error); - if (!result) + if (class->close_fn) { - g_simple_async_result_set_from_error (res, error); - g_error_free (error); + /* Make sure to close, even if the flush failed (see sync close) */ + if (!result) + class->close_fn (G_OUTPUT_STREAM (object), cancellable, NULL); + else + result = class->close_fn (G_OUTPUT_STREAM (object), cancellable, &error); + + if (!result) + g_simple_async_result_take_error (res, error); } } @@ -1317,9 +1730,10 @@ g_output_stream_real_close_finish (GOutputStream *stream, GError **error) { GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); - g_assert (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_close_async); + + g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_close_async); + + if (g_simple_async_result_propagate_error (simple, error)) + return FALSE; return TRUE; } - -#define __G_OUTPUT_STREAM_C__ -#include "gioaliasdef.c"