1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18 * Boston, MA 02111-1307, USA.
20 * Author: Alexander Larsson <alexl@redhat.com>
24 #include "goutputstream.h"
25 #include "gcancellable.h"
26 #include "gasyncresult.h"
28 #include "ginputstream.h"
31 #include "gpollableoutputstream.h"
34 * SECTION:goutputstream
35 * @short_description: Base class for implementing streaming output
38 * #GOutputStream has functions to write to a stream (g_output_stream_write()),
39 * to close a stream (g_output_stream_close()) and to flush pending writes
40 * (g_output_stream_flush()).
42 * To copy the content of an input stream to an output stream without
43 * manually handling the reads and writes, use g_output_stream_splice().
45 * All of these functions have async variants too.
48 struct _GOutputStreamPrivate {
52 GAsyncReadyCallback outstanding_callback;
55 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT)
57 static gssize g_output_stream_real_splice (GOutputStream *stream,
59 GOutputStreamSpliceFlags flags,
60 GCancellable *cancellable,
62 static void g_output_stream_real_write_async (GOutputStream *stream,
66 GCancellable *cancellable,
67 GAsyncReadyCallback callback,
69 static gssize g_output_stream_real_write_finish (GOutputStream *stream,
72 static void g_output_stream_real_splice_async (GOutputStream *stream,
74 GOutputStreamSpliceFlags flags,
76 GCancellable *cancellable,
77 GAsyncReadyCallback callback,
79 static gssize g_output_stream_real_splice_finish (GOutputStream *stream,
82 static void g_output_stream_real_flush_async (GOutputStream *stream,
84 GCancellable *cancellable,
85 GAsyncReadyCallback callback,
87 static gboolean g_output_stream_real_flush_finish (GOutputStream *stream,
90 static void g_output_stream_real_close_async (GOutputStream *stream,
92 GCancellable *cancellable,
93 GAsyncReadyCallback callback,
95 static gboolean g_output_stream_real_close_finish (GOutputStream *stream,
98 static gboolean _g_output_stream_close_internal (GOutputStream *stream,
99 GCancellable *cancellable,
103 g_output_stream_dispose (GObject *object)
105 GOutputStream *stream;
107 stream = G_OUTPUT_STREAM (object);
109 if (!stream->priv->closed)
110 g_output_stream_close (stream, NULL, NULL);
112 G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
116 g_output_stream_class_init (GOutputStreamClass *klass)
118 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
120 gobject_class->dispose = g_output_stream_dispose;
122 klass->splice = g_output_stream_real_splice;
124 klass->write_async = g_output_stream_real_write_async;
125 klass->write_finish = g_output_stream_real_write_finish;
126 klass->splice_async = g_output_stream_real_splice_async;
127 klass->splice_finish = g_output_stream_real_splice_finish;
128 klass->flush_async = g_output_stream_real_flush_async;
129 klass->flush_finish = g_output_stream_real_flush_finish;
130 klass->close_async = g_output_stream_real_close_async;
131 klass->close_finish = g_output_stream_real_close_finish;
135 g_output_stream_init (GOutputStream *stream)
137 stream->priv = g_output_stream_get_instance_private (stream);
141 * g_output_stream_write:
142 * @stream: a #GOutputStream.
143 * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write.
144 * @count: the number of bytes to write
145 * @cancellable: (allow-none): optional cancellable object
146 * @error: location to store the error occurring, or %NULL to ignore
148 * Tries to write @count bytes from @buffer into the stream. Will block
149 * during the operation.
151 * If count is 0, returns 0 and does nothing. A value of @count
152 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
154 * On success, the number of bytes written to the stream is returned.
155 * It is not an error if this is not the same as the requested size, as it
156 * can happen e.g. on a partial I/O error, or if there is not enough
157 * storage in the stream. All writes block until at least one byte
158 * is written or an error occurs; 0 is never returned (unless
161 * If @cancellable is not %NULL, then the operation can be cancelled by
162 * triggering the cancellable object from another thread. If the operation
163 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
164 * operation was partially finished when the operation was cancelled the
165 * partial result will be returned, without an error.
167 * On error -1 is returned and @error is set accordingly.
171 * Return value: Number of bytes written, or -1 on error
174 g_output_stream_write (GOutputStream *stream,
177 GCancellable *cancellable,
180 GOutputStreamClass *class;
183 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
184 g_return_val_if_fail (buffer != NULL, 0);
189 if (((gssize) count) < 0)
191 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
192 _("Too large count value passed to %s"), G_STRFUNC);
196 class = G_OUTPUT_STREAM_GET_CLASS (stream);
198 if (class->write_fn == NULL)
200 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
201 _("Output stream doesn't implement write"));
205 if (!g_output_stream_set_pending (stream, error))
209 g_cancellable_push_current (cancellable);
211 res = class->write_fn (stream, buffer, count, cancellable, error);
214 g_cancellable_pop_current (cancellable);
216 g_output_stream_clear_pending (stream);
222 * g_output_stream_write_all:
223 * @stream: a #GOutputStream.
224 * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write.
225 * @count: the number of bytes to write
226 * @bytes_written: (out): location to store the number of bytes that was
227 * written to the stream
228 * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
229 * @error: location to store the error occurring, or %NULL to ignore
231 * Tries to write @count bytes from @buffer into the stream. Will block
232 * during the operation.
234 * This function is similar to g_output_stream_write(), except it tries to
235 * write as many bytes as requested, only stopping on an error.
237 * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
240 * If there is an error during the operation %FALSE is returned and @error
241 * is set to indicate the error status, @bytes_written is updated to contain
242 * the number of bytes written into the stream before the error occurred.
244 * Return value: %TRUE on success, %FALSE if there was an error
247 g_output_stream_write_all (GOutputStream *stream,
250 gsize *bytes_written,
251 GCancellable *cancellable,
254 gsize _bytes_written;
257 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
258 g_return_val_if_fail (buffer != NULL, FALSE);
261 while (_bytes_written < count)
263 res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
268 *bytes_written = _bytes_written;
273 g_warning ("Write returned zero without error");
275 _bytes_written += res;
279 *bytes_written = _bytes_written;
285 * g_output_stream_write_bytes:
286 * @stream: a #GOutputStream.
287 * @bytes: the #GBytes to write
288 * @cancellable: (allow-none): optional cancellable object
289 * @error: location to store the error occurring, or %NULL to ignore
291 * Tries to write the data from @bytes into the stream. Will block
292 * during the operation.
294 * If @bytes is 0-length, returns 0 and does nothing. A #GBytes larger
295 * than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
297 * On success, the number of bytes written to the stream is returned.
298 * It is not an error if this is not the same as the requested size, as it
299 * can happen e.g. on a partial I/O error, or if there is not enough
300 * storage in the stream. All writes block until at least one byte
301 * is written or an error occurs; 0 is never returned (unless
302 * the size of @bytes is 0).
304 * If @cancellable is not %NULL, then the operation can be cancelled by
305 * triggering the cancellable object from another thread. If the operation
306 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
307 * operation was partially finished when the operation was cancelled the
308 * partial result will be returned, without an error.
310 * On error -1 is returned and @error is set accordingly.
312 * Return value: Number of bytes written, or -1 on error
315 g_output_stream_write_bytes (GOutputStream *stream,
317 GCancellable *cancellable,
323 data = g_bytes_get_data (bytes, &size);
325 return g_output_stream_write (stream,
332 * g_output_stream_flush:
333 * @stream: a #GOutputStream.
334 * @cancellable: (allow-none): optional cancellable object
335 * @error: location to store the error occurring, or %NULL to ignore
337 * Forces a write of all user-space buffered data for the given
338 * @stream. Will block during the operation. Closing the stream will
339 * implicitly cause a flush.
341 * This function is optional for inherited classes.
343 * If @cancellable is not %NULL, then the operation can be cancelled by
344 * triggering the cancellable object from another thread. If the operation
345 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
347 * Return value: %TRUE on success, %FALSE on error
350 g_output_stream_flush (GOutputStream *stream,
351 GCancellable *cancellable,
354 GOutputStreamClass *class;
357 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
359 if (!g_output_stream_set_pending (stream, error))
362 class = G_OUTPUT_STREAM_GET_CLASS (stream);
368 g_cancellable_push_current (cancellable);
370 res = class->flush (stream, cancellable, error);
373 g_cancellable_pop_current (cancellable);
376 g_output_stream_clear_pending (stream);
382 * g_output_stream_splice:
383 * @stream: a #GOutputStream.
384 * @source: a #GInputStream.
385 * @flags: a set of #GOutputStreamSpliceFlags.
386 * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
387 * @error: a #GError location to store the error occurring, or %NULL to
390 * Splices an input stream into an output stream.
392 * Returns: a #gssize containing the size of the data spliced, or
393 * -1 if an error occurred. Note that if the number of bytes
394 * spliced is greater than %G_MAXSSIZE, then that will be
395 * returned, and there is no way to determine the actual number
399 g_output_stream_splice (GOutputStream *stream,
400 GInputStream *source,
401 GOutputStreamSpliceFlags flags,
402 GCancellable *cancellable,
405 GOutputStreamClass *class;
408 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
409 g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
411 if (g_input_stream_is_closed (source))
413 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
414 _("Source stream is already closed"));
418 if (!g_output_stream_set_pending (stream, error))
421 class = G_OUTPUT_STREAM_GET_CLASS (stream);
424 g_cancellable_push_current (cancellable);
426 bytes_copied = class->splice (stream, source, flags, cancellable, error);
429 g_cancellable_pop_current (cancellable);
431 g_output_stream_clear_pending (stream);
437 g_output_stream_real_splice (GOutputStream *stream,
438 GInputStream *source,
439 GOutputStreamSpliceFlags flags,
440 GCancellable *cancellable,
443 GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
444 gssize n_read, n_written;
446 char buffer[8192], *p;
450 if (class->write_fn == NULL)
452 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
453 _("Output stream doesn't implement write"));
461 n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
474 n_written = class->write_fn (stream, p, n_read, cancellable, error);
483 bytes_copied += n_written;
486 if (bytes_copied > G_MAXSSIZE)
487 bytes_copied = G_MAXSSIZE;
493 error = NULL; /* Ignore further errors */
495 if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
497 /* Don't care about errors in source here */
498 g_input_stream_close (source, cancellable, NULL);
501 if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
503 /* But write errors on close are bad! */
504 res = _g_output_stream_close_internal (stream, cancellable, error);
513 /* Must always be called inside
514 * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
516 _g_output_stream_close_internal (GOutputStream *stream,
517 GCancellable *cancellable,
520 GOutputStreamClass *class;
523 if (stream->priv->closed)
526 class = G_OUTPUT_STREAM_GET_CLASS (stream);
528 stream->priv->closing = TRUE;
531 g_cancellable_push_current (cancellable);
534 res = class->flush (stream, cancellable, error);
540 /* flushing caused the error that we want to return,
541 * but we still want to close the underlying stream if possible
544 class->close_fn (stream, cancellable, NULL);
550 res = class->close_fn (stream, cancellable, error);
554 g_cancellable_pop_current (cancellable);
556 stream->priv->closing = FALSE;
557 stream->priv->closed = TRUE;
563 * g_output_stream_close:
564 * @stream: A #GOutputStream.
565 * @cancellable: (allow-none): optional cancellable object
566 * @error: location to store the error occurring, or %NULL to ignore
568 * Closes the stream, releasing resources related to it.
570 * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
571 * Closing a stream multiple times will not return an error.
573 * Closing a stream will automatically flush any outstanding buffers in the
576 * Streams will be automatically closed when the last reference
577 * is dropped, but you might want to call this function to make sure
578 * resources are released as early as possible.
580 * Some streams might keep the backing store of the stream (e.g. a file descriptor)
581 * open after the stream is closed. See the documentation for the individual
582 * stream for details.
584 * On failure the first error that happened will be reported, but the close
585 * operation will finish as much as possible. A stream that failed to
586 * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
587 * is important to check and report the error to the user, otherwise
588 * there might be a loss of data as all data might not be written.
590 * If @cancellable is not %NULL, then the operation can be cancelled by
591 * triggering the cancellable object from another thread. If the operation
592 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
593 * Cancelling a close will still leave the stream closed, but there some streams
594 * can use a faster close that doesn't block to e.g. check errors. On
595 * cancellation (as with any error) there is no guarantee that all written
596 * data will reach the target.
598 * Return value: %TRUE on success, %FALSE on failure
601 g_output_stream_close (GOutputStream *stream,
602 GCancellable *cancellable,
607 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
609 if (stream->priv->closed)
612 if (!g_output_stream_set_pending (stream, error))
615 res = _g_output_stream_close_internal (stream, cancellable, error);
617 g_output_stream_clear_pending (stream);
623 async_ready_write_callback_wrapper (GObject *source_object,
627 GOutputStream *stream = G_OUTPUT_STREAM (source_object);
628 GOutputStreamClass *class;
629 GTask *task = user_data;
631 GError *error = NULL;
633 g_output_stream_clear_pending (stream);
635 if (g_async_result_legacy_propagate_error (res, &error))
639 class = G_OUTPUT_STREAM_GET_CLASS (stream);
640 nwrote = class->write_finish (stream, res, &error);
644 g_task_return_int (task, nwrote);
646 g_task_return_error (task, error);
647 g_object_unref (task);
651 * g_output_stream_write_async:
652 * @stream: A #GOutputStream.
653 * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write.
654 * @count: the number of bytes to write
655 * @io_priority: the io priority of the request.
656 * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
657 * @callback: (scope async): callback to call when the request is satisfied
658 * @user_data: (closure): the data to pass to callback function
660 * Request an asynchronous write of @count bytes from @buffer into
661 * the stream. When the operation is finished @callback will be called.
662 * You can then call g_output_stream_write_finish() to get the result of the
665 * During an async request no other sync and async calls are allowed,
666 * and will result in %G_IO_ERROR_PENDING errors.
668 * A value of @count larger than %G_MAXSSIZE will cause a
669 * %G_IO_ERROR_INVALID_ARGUMENT error.
671 * On success, the number of bytes written will be passed to the
672 * @callback. It is not an error if this is not the same as the
673 * requested size, as it can happen e.g. on a partial I/O error,
674 * but generally we try to write as many bytes as requested.
676 * You are guaranteed that this method will never fail with
677 * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
678 * method will just wait until this changes.
680 * Any outstanding I/O request with higher priority (lower numerical
681 * value) will be executed before an outstanding request with lower
682 * priority. Default priority is %G_PRIORITY_DEFAULT.
684 * The asyncronous methods have a default fallback that uses threads
685 * to implement asynchronicity, so they are optional for inheriting
686 * classes. However, if you override one you must override all.
688 * For the synchronous, blocking version of this function, see
689 * g_output_stream_write().
692 g_output_stream_write_async (GOutputStream *stream,
696 GCancellable *cancellable,
697 GAsyncReadyCallback callback,
700 GOutputStreamClass *class;
701 GError *error = NULL;
704 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
705 g_return_if_fail (buffer != NULL);
707 task = g_task_new (stream, cancellable, callback, user_data);
708 g_task_set_source_tag (task, g_output_stream_write_async);
709 g_task_set_priority (task, io_priority);
713 g_task_return_int (task, 0);
714 g_object_unref (task);
718 if (((gssize) count) < 0)
720 g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
721 _("Too large count value passed to %s"),
723 g_object_unref (task);
727 if (!g_output_stream_set_pending (stream, &error))
729 g_task_return_error (task, error);
730 g_object_unref (task);
734 class = G_OUTPUT_STREAM_GET_CLASS (stream);
736 class->write_async (stream, buffer, count, io_priority, cancellable,
737 async_ready_write_callback_wrapper, task);
741 * g_output_stream_write_finish:
742 * @stream: a #GOutputStream.
743 * @result: a #GAsyncResult.
744 * @error: a #GError location to store the error occurring, or %NULL to
747 * Finishes a stream write operation.
749 * Returns: a #gssize containing the number of bytes written to the stream.
752 g_output_stream_write_finish (GOutputStream *stream,
753 GAsyncResult *result,
756 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
757 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
758 g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
760 /* @result is always the GTask created by g_output_stream_write_async();
761 * we called class->write_finish() from async_ready_write_callback_wrapper.
763 return g_task_propagate_int (G_TASK (result), error);
767 write_bytes_callback (GObject *stream,
768 GAsyncResult *result,
771 GTask *task = user_data;
772 GError *error = NULL;
775 nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
778 g_task_return_error (task, error);
780 g_task_return_int (task, nwrote);
781 g_object_unref (task);
785 * g_output_stream_write_bytes_async:
786 * @stream: A #GOutputStream.
787 * @bytes: The bytes to write
788 * @io_priority: the io priority of the request.
789 * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
790 * @callback: (scope async): callback to call when the request is satisfied
791 * @user_data: (closure): the data to pass to callback function
793 * Request an asynchronous write of the data in @bytes to the stream.
794 * When the operation is finished @callback will be called. You can
795 * then call g_output_stream_write_bytes_finish() to get the result of
798 * During an async request no other sync and async calls are allowed,
799 * and will result in %G_IO_ERROR_PENDING errors.
801 * A #GBytes larger than %G_MAXSSIZE will cause a
802 * %G_IO_ERROR_INVALID_ARGUMENT error.
804 * On success, the number of bytes written will be passed to the
805 * @callback. It is not an error if this is not the same as the
806 * requested size, as it can happen e.g. on a partial I/O error,
807 * but generally we try to write as many bytes as requested.
809 * You are guaranteed that this method will never fail with
810 * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
811 * method will just wait until this changes.
813 * Any outstanding I/O request with higher priority (lower numerical
814 * value) will be executed before an outstanding request with lower
815 * priority. Default priority is %G_PRIORITY_DEFAULT.
817 * For the synchronous, blocking version of this function, see
818 * g_output_stream_write_bytes().
821 g_output_stream_write_bytes_async (GOutputStream *stream,
824 GCancellable *cancellable,
825 GAsyncReadyCallback callback,
832 data = g_bytes_get_data (bytes, &size);
834 task = g_task_new (stream, cancellable, callback, user_data);
835 g_task_set_task_data (task, g_bytes_ref (bytes),
836 (GDestroyNotify) g_bytes_unref);
838 g_output_stream_write_async (stream,
842 write_bytes_callback,
847 * g_output_stream_write_bytes_finish:
848 * @stream: a #GOutputStream.
849 * @result: a #GAsyncResult.
850 * @error: a #GError location to store the error occurring, or %NULL to
853 * Finishes a stream write-from-#GBytes operation.
855 * Returns: a #gssize containing the number of bytes written to the stream.
858 g_output_stream_write_bytes_finish (GOutputStream *stream,
859 GAsyncResult *result,
862 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
863 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
865 return g_task_propagate_int (G_TASK (result), error);
869 async_ready_splice_callback_wrapper (GObject *source_object,
873 GOutputStream *stream = G_OUTPUT_STREAM (source_object);
874 GOutputStreamClass *class;
877 GError *error = NULL;
879 g_output_stream_clear_pending (stream);
881 if (g_async_result_legacy_propagate_error (res, &error))
885 class = G_OUTPUT_STREAM_GET_CLASS (stream);
886 nspliced = class->splice_finish (stream, res, &error);
890 g_task_return_int (task, nspliced);
892 g_task_return_error (task, error);
893 g_object_unref (task);
897 * g_output_stream_splice_async:
898 * @stream: a #GOutputStream.
899 * @source: a #GInputStream.
900 * @flags: a set of #GOutputStreamSpliceFlags.
901 * @io_priority: the io priority of the request.
902 * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
903 * @callback: (scope async): a #GAsyncReadyCallback.
904 * @user_data: (closure): user data passed to @callback.
906 * Splices a stream asynchronously.
907 * When the operation is finished @callback will be called.
908 * You can then call g_output_stream_splice_finish() to get the
909 * result of the operation.
911 * For the synchronous, blocking version of this function, see
912 * g_output_stream_splice().
915 g_output_stream_splice_async (GOutputStream *stream,
916 GInputStream *source,
917 GOutputStreamSpliceFlags flags,
919 GCancellable *cancellable,
920 GAsyncReadyCallback callback,
923 GOutputStreamClass *class;
925 GError *error = NULL;
927 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
928 g_return_if_fail (G_IS_INPUT_STREAM (source));
930 task = g_task_new (stream, cancellable, callback, user_data);
931 g_task_set_source_tag (task, g_output_stream_splice_async);
932 g_task_set_priority (task, io_priority);
933 g_task_set_task_data (task, g_object_ref (source), g_object_unref);
935 if (g_input_stream_is_closed (source))
937 g_task_return_new_error (task,
938 G_IO_ERROR, G_IO_ERROR_CLOSED,
939 _("Source stream is already closed"));
940 g_object_unref (task);
944 if (!g_output_stream_set_pending (stream, &error))
946 g_task_return_error (task, error);
947 g_object_unref (task);
951 class = G_OUTPUT_STREAM_GET_CLASS (stream);
953 class->splice_async (stream, source, flags, io_priority, cancellable,
954 async_ready_splice_callback_wrapper, task);
958 * g_output_stream_splice_finish:
959 * @stream: a #GOutputStream.
960 * @result: a #GAsyncResult.
961 * @error: a #GError location to store the error occurring, or %NULL to
964 * Finishes an asynchronous stream splice operation.
966 * Returns: a #gssize of the number of bytes spliced. Note that if the
967 * number of bytes spliced is greater than %G_MAXSSIZE, then that
968 * will be returned, and there is no way to determine the actual
969 * number of bytes spliced.
972 g_output_stream_splice_finish (GOutputStream *stream,
973 GAsyncResult *result,
976 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
977 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
978 g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
980 /* @result is always the GTask created by g_output_stream_splice_async();
981 * we called class->splice_finish() from async_ready_splice_callback_wrapper.
983 return g_task_propagate_int (G_TASK (result), error);
987 async_ready_flush_callback_wrapper (GObject *source_object,
991 GOutputStream *stream = G_OUTPUT_STREAM (source_object);
992 GOutputStreamClass *class;
993 GTask *task = user_data;
995 GError *error = NULL;
997 g_output_stream_clear_pending (stream);
999 if (g_async_result_legacy_propagate_error (res, &error))
1003 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1004 flushed = class->flush_finish (stream, res, &error);
1008 g_task_return_boolean (task, TRUE);
1010 g_task_return_error (task, error);
1011 g_object_unref (task);
1015 * g_output_stream_flush_async:
1016 * @stream: a #GOutputStream.
1017 * @io_priority: the io priority of the request.
1018 * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1019 * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1020 * @user_data: (closure): the data to pass to callback function
1022 * Forces an asynchronous write of all user-space buffered data for
1023 * the given @stream.
1024 * For behaviour details see g_output_stream_flush().
1026 * When the operation is finished @callback will be
1027 * called. You can then call g_output_stream_flush_finish() to get the
1028 * result of the operation.
1031 g_output_stream_flush_async (GOutputStream *stream,
1033 GCancellable *cancellable,
1034 GAsyncReadyCallback callback,
1037 GOutputStreamClass *class;
1039 GError *error = NULL;
1041 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1043 task = g_task_new (stream, cancellable, callback, user_data);
1044 g_task_set_source_tag (task, g_output_stream_flush_async);
1045 g_task_set_priority (task, io_priority);
1047 if (!g_output_stream_set_pending (stream, &error))
1049 g_task_return_error (task, error);
1050 g_object_unref (task);
1054 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1056 if (class->flush_async == NULL)
1058 g_task_return_boolean (task, TRUE);
1059 g_object_unref (task);
1063 class->flush_async (stream, io_priority, cancellable,
1064 async_ready_flush_callback_wrapper, task);
1068 * g_output_stream_flush_finish:
1069 * @stream: a #GOutputStream.
1070 * @result: a GAsyncResult.
1071 * @error: a #GError location to store the error occurring, or %NULL to
1074 * Finishes flushing an output stream.
1076 * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1079 g_output_stream_flush_finish (GOutputStream *stream,
1080 GAsyncResult *result,
1083 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1084 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1085 g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1087 /* @result is always the GTask created by g_output_stream_flush_async();
1088 * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1090 return g_task_propagate_boolean (G_TASK (result), error);
1095 async_ready_close_callback_wrapper (GObject *source_object,
1099 GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1100 GOutputStreamClass *class;
1101 GTask *task = user_data;
1102 GError *error = g_task_get_task_data (task);
1104 stream->priv->closing = FALSE;
1105 stream->priv->closed = TRUE;
1107 if (!error && !g_async_result_legacy_propagate_error (res, &error))
1109 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1111 class->close_finish (stream, res,
1112 error ? NULL : &error);
1115 g_output_stream_clear_pending (stream);
1118 g_task_return_error (task, error);
1120 g_task_return_boolean (task, TRUE);
1121 g_object_unref (task);
1125 async_ready_close_flushed_callback_wrapper (GObject *source_object,
1129 GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1130 GOutputStreamClass *class;
1131 GTask *task = user_data;
1132 GError *error = NULL;
1134 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1136 if (!g_async_result_legacy_propagate_error (res, &error))
1138 class->flush_finish (stream, res, &error);
1141 /* propagate the possible error */
1143 g_task_set_task_data (task, error, NULL);
1145 /* we still close, even if there was a flush error */
1146 class->close_async (stream,
1147 g_task_get_priority (task),
1148 g_task_get_cancellable (task),
1149 async_ready_close_callback_wrapper, task);
1153 * g_output_stream_close_async:
1154 * @stream: A #GOutputStream.
1155 * @io_priority: the io priority of the request.
1156 * @cancellable: (allow-none): optional cancellable object
1157 * @callback: (scope async): callback to call when the request is satisfied
1158 * @user_data: (closure): the data to pass to callback function
1160 * Requests an asynchronous close of the stream, releasing resources
1161 * related to it. When the operation is finished @callback will be
1162 * called. You can then call g_output_stream_close_finish() to get
1163 * the result of the operation.
1165 * For behaviour details see g_output_stream_close().
1167 * The asyncronous methods have a default fallback that uses threads
1168 * to implement asynchronicity, so they are optional for inheriting
1169 * classes. However, if you override one you must override all.
1172 g_output_stream_close_async (GOutputStream *stream,
1174 GCancellable *cancellable,
1175 GAsyncReadyCallback callback,
1178 GOutputStreamClass *class;
1180 GError *error = NULL;
1182 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1184 task = g_task_new (stream, cancellable, callback, user_data);
1185 g_task_set_source_tag (task, g_output_stream_close_async);
1186 g_task_set_priority (task, io_priority);
1188 if (stream->priv->closed)
1190 g_task_return_boolean (task, TRUE);
1191 g_object_unref (task);
1195 if (!g_output_stream_set_pending (stream, &error))
1197 g_task_return_error (task, error);
1198 g_object_unref (task);
1202 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1203 stream->priv->closing = TRUE;
1205 /* Call close_async directly if there is no need to flush, or if the flush
1206 can be done sync (in the output stream async close thread) */
1207 if (class->flush_async == NULL ||
1208 (class->flush_async == g_output_stream_real_flush_async &&
1209 (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1211 class->close_async (stream, io_priority, cancellable,
1212 async_ready_close_callback_wrapper, task);
1216 /* First do an async flush, then do the async close in the callback
1217 wrapper (see async_ready_close_flushed_callback_wrapper) */
1218 class->flush_async (stream, io_priority, cancellable,
1219 async_ready_close_flushed_callback_wrapper, task);
1224 * g_output_stream_close_finish:
1225 * @stream: a #GOutputStream.
1226 * @result: a #GAsyncResult.
1227 * @error: a #GError location to store the error occurring, or %NULL to
1230 * Closes an output stream.
1232 * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1235 g_output_stream_close_finish (GOutputStream *stream,
1236 GAsyncResult *result,
1239 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1240 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1241 g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
1243 /* @result is always the GTask created by g_output_stream_close_async();
1244 * we called class->close_finish() from async_ready_close_callback_wrapper.
1246 return g_task_propagate_boolean (G_TASK (result), error);
1250 * g_output_stream_is_closed:
1251 * @stream: a #GOutputStream.
1253 * Checks if an output stream has already been closed.
1255 * Returns: %TRUE if @stream is closed. %FALSE otherwise.
1258 g_output_stream_is_closed (GOutputStream *stream)
1260 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1262 return stream->priv->closed;
1266 * g_output_stream_is_closing:
1267 * @stream: a #GOutputStream.
1269 * Checks if an output stream is being closed. This can be
1270 * used inside e.g. a flush implementation to see if the
1271 * flush (or other i/o operation) is called from within
1272 * the closing operation.
1274 * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1279 g_output_stream_is_closing (GOutputStream *stream)
1281 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1283 return stream->priv->closing;
1287 * g_output_stream_has_pending:
1288 * @stream: a #GOutputStream.
1290 * Checks if an ouput stream has pending actions.
1292 * Returns: %TRUE if @stream has pending actions.
1295 g_output_stream_has_pending (GOutputStream *stream)
1297 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1299 return stream->priv->pending;
1303 * g_output_stream_set_pending:
1304 * @stream: a #GOutputStream.
1305 * @error: a #GError location to store the error occurring, or %NULL to
1308 * Sets @stream to have actions pending. If the pending flag is
1309 * already set or @stream is closed, it will return %FALSE and set
1312 * Return value: %TRUE if pending was previously unset and is now set.
1315 g_output_stream_set_pending (GOutputStream *stream,
1318 g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1320 if (stream->priv->closed)
1322 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1323 _("Stream is already closed"));
1327 if (stream->priv->pending)
1329 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1330 /* Translators: This is an error you get if there is
1331 * already an operation running against this stream when
1332 * you try to start one */
1333 _("Stream has outstanding operation"));
1337 stream->priv->pending = TRUE;
1342 * g_output_stream_clear_pending:
1343 * @stream: output stream
1345 * Clears the pending flag on @stream.
1348 g_output_stream_clear_pending (GOutputStream *stream)
1350 g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1352 stream->priv->pending = FALSE;
1356 /********************************************
1357 * Default implementation of async ops *
1358 ********************************************/
1362 gsize count_requested;
1363 gssize count_written;
1367 free_write_data (WriteData *op)
1369 g_slice_free (WriteData, op);
1373 write_async_thread (GTask *task,
1374 gpointer source_object,
1376 GCancellable *cancellable)
1378 GOutputStream *stream = source_object;
1379 WriteData *op = task_data;
1380 GOutputStreamClass *class;
1381 GError *error = NULL;
1382 gssize count_written;
1384 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1385 count_written = class->write_fn (stream, op->buffer, op->count_requested,
1386 cancellable, &error);
1387 if (count_written == -1)
1388 g_task_return_error (task, error);
1390 g_task_return_int (task, count_written);
1393 static void write_async_pollable (GPollableOutputStream *stream,
1397 write_async_pollable_ready (GPollableOutputStream *stream,
1400 GTask *task = user_data;
1402 write_async_pollable (stream, task);
1407 write_async_pollable (GPollableOutputStream *stream,
1410 GError *error = NULL;
1411 WriteData *op = g_task_get_task_data (task);
1412 gssize count_written;
1414 if (g_task_return_error_if_cancelled (task))
1417 count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1418 write_nonblocking (stream, op->buffer, op->count_requested, &error);
1420 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1424 g_error_free (error);
1426 source = g_pollable_output_stream_create_source (stream,
1427 g_task_get_cancellable (task));
1428 g_task_attach_source (task, source,
1429 (GSourceFunc) write_async_pollable_ready);
1430 g_source_unref (source);
1434 if (count_written == -1)
1435 g_task_return_error (task, error);
1437 g_task_return_int (task, count_written);
1441 g_output_stream_real_write_async (GOutputStream *stream,
1445 GCancellable *cancellable,
1446 GAsyncReadyCallback callback,
1452 op = g_slice_new0 (WriteData);
1453 task = g_task_new (stream, cancellable, callback, user_data);
1454 g_task_set_check_cancellable (task, FALSE);
1455 g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
1456 op->buffer = buffer;
1457 op->count_requested = count;
1459 if (G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1460 g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))
1461 write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
1463 g_task_run_in_thread (task, write_async_thread);
1464 g_object_unref (task);
1468 g_output_stream_real_write_finish (GOutputStream *stream,
1469 GAsyncResult *result,
1472 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1474 return g_task_propagate_int (G_TASK (result), error);
1478 GInputStream *source;
1479 GOutputStreamSpliceFlags flags;
1483 free_splice_data (SpliceData *op)
1485 g_object_unref (op->source);
1490 splice_async_thread (GTask *task,
1491 gpointer source_object,
1493 GCancellable *cancellable)
1495 GOutputStream *stream = source_object;
1496 SpliceData *op = task_data;
1497 GOutputStreamClass *class;
1498 GError *error = NULL;
1499 gssize bytes_copied;
1501 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1503 bytes_copied = class->splice (stream,
1508 if (bytes_copied == -1)
1509 g_task_return_error (task, error);
1511 g_task_return_int (task, bytes_copied);
1515 g_output_stream_real_splice_async (GOutputStream *stream,
1516 GInputStream *source,
1517 GOutputStreamSpliceFlags flags,
1519 GCancellable *cancellable,
1520 GAsyncReadyCallback callback,
1526 op = g_new0 (SpliceData, 1);
1527 task = g_task_new (stream, cancellable, callback, user_data);
1528 g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
1530 op->source = g_object_ref (source);
1532 /* TODO: In the case where both source and destintion have
1533 non-threadbased async calls we can use a true async copy here */
1535 g_task_run_in_thread (task, splice_async_thread);
1536 g_object_unref (task);
1540 g_output_stream_real_splice_finish (GOutputStream *stream,
1541 GAsyncResult *result,
1544 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1546 return g_task_propagate_int (G_TASK (result), error);
1551 flush_async_thread (GTask *task,
1552 gpointer source_object,
1554 GCancellable *cancellable)
1556 GOutputStream *stream = source_object;
1557 GOutputStreamClass *class;
1559 GError *error = NULL;
1561 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1564 result = class->flush (stream, cancellable, &error);
1567 g_task_return_boolean (task, TRUE);
1569 g_task_return_error (task, error);
1573 g_output_stream_real_flush_async (GOutputStream *stream,
1575 GCancellable *cancellable,
1576 GAsyncReadyCallback callback,
1581 task = g_task_new (stream, cancellable, callback, user_data);
1582 g_task_set_priority (task, io_priority);
1583 g_task_run_in_thread (task, flush_async_thread);
1584 g_object_unref (task);
1588 g_output_stream_real_flush_finish (GOutputStream *stream,
1589 GAsyncResult *result,
1592 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1594 return g_task_propagate_boolean (G_TASK (result), error);
1598 close_async_thread (GTask *task,
1599 gpointer source_object,
1601 GCancellable *cancellable)
1603 GOutputStream *stream = source_object;
1604 GOutputStreamClass *class;
1605 GError *error = NULL;
1606 gboolean result = TRUE;
1608 class = G_OUTPUT_STREAM_GET_CLASS (stream);
1610 /* Do a flush here if there is a flush function, and we did not have to do
1611 * an async flush before (see g_output_stream_close_async)
1613 if (class->flush != NULL &&
1614 (class->flush_async == NULL ||
1615 class->flush_async == g_output_stream_real_flush_async))
1617 result = class->flush (stream, cancellable, &error);
1620 /* Auto handling of cancelation disabled, and ignore
1621 cancellation, since we want to close things anyway, although
1622 possibly in a quick-n-dirty way. At least we never want to leak
1625 if (class->close_fn)
1627 /* Make sure to close, even if the flush failed (see sync close) */
1629 class->close_fn (stream, cancellable, NULL);
1631 result = class->close_fn (stream, cancellable, &error);
1635 g_task_return_boolean (task, TRUE);
1637 g_task_return_error (task, error);
1641 g_output_stream_real_close_async (GOutputStream *stream,
1643 GCancellable *cancellable,
1644 GAsyncReadyCallback callback,
1649 task = g_task_new (stream, cancellable, callback, user_data);
1650 g_task_set_priority (task, io_priority);
1651 g_task_run_in_thread (task, close_async_thread);
1652 g_object_unref (task);
1656 g_output_stream_real_close_finish (GOutputStream *stream,
1657 GAsyncResult *result,
1660 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1662 return g_task_propagate_boolean (G_TASK (result), error);