1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * SPDX-License-Identifier: LGPL-2.1-or-later
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General
18 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
20 * Author: Alexander Larsson <alexl@redhat.com>
27 #include "ginputstream.h"
28 #include "gioprivate.h"
29 #include "gseekable.h"
30 #include "gcancellable.h"
31 #include "gasyncresult.h"
33 #include "gpollableinputstream.h"
38 * `GInputStream` is a base class for implementing streaming input.
40 * It has functions to read from a stream ([method@Gio.InputStream.read]),
41 * to close a stream ([method@Gio.InputStream.close]) and to skip some content
42 * ([method@Gio.InputStream.skip]).
44 * To copy the content of an input stream to an output stream without
45 * manually handling the reads and writes, use [method@Gio.OutputStream.splice].
47 * See the documentation for [class@Gio.IOStream] for details of thread safety
50 * All of these functions have async variants too.
53 struct _GInputStreamPrivate {
56 GAsyncReadyCallback outstanding_callback;
59 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
61 static gssize g_input_stream_real_skip (GInputStream *stream,
63 GCancellable *cancellable,
65 static void g_input_stream_real_read_async (GInputStream *stream,
69 GCancellable *cancellable,
70 GAsyncReadyCallback callback,
72 static gssize g_input_stream_real_read_finish (GInputStream *stream,
75 static void g_input_stream_real_skip_async (GInputStream *stream,
78 GCancellable *cancellable,
79 GAsyncReadyCallback callback,
81 static gssize g_input_stream_real_skip_finish (GInputStream *stream,
84 static void g_input_stream_real_close_async (GInputStream *stream,
86 GCancellable *cancellable,
87 GAsyncReadyCallback callback,
89 static gboolean g_input_stream_real_close_finish (GInputStream *stream,
94 g_input_stream_dispose (GObject *object)
98 stream = G_INPUT_STREAM (object);
100 if (!stream->priv->closed)
101 g_input_stream_close (stream, NULL, NULL);
103 G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
108 g_input_stream_class_init (GInputStreamClass *klass)
110 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
112 gobject_class->dispose = g_input_stream_dispose;
114 klass->skip = g_input_stream_real_skip;
115 klass->read_async = g_input_stream_real_read_async;
116 klass->read_finish = g_input_stream_real_read_finish;
117 klass->skip_async = g_input_stream_real_skip_async;
118 klass->skip_finish = g_input_stream_real_skip_finish;
119 klass->close_async = g_input_stream_real_close_async;
120 klass->close_finish = g_input_stream_real_close_finish;
124 g_input_stream_init (GInputStream *stream)
126 stream->priv = g_input_stream_get_instance_private (stream);
130 * g_input_stream_read:
131 * @stream: a #GInputStream.
132 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
133 * a buffer to read data into (which should be at least count bytes long).
134 * @count: (in): the number of bytes that will be read from the stream
135 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
136 * @error: location to store the error occurring, or %NULL to ignore
138 * Tries to read @count bytes from the stream into the buffer starting at
139 * @buffer. Will block during this read.
141 * If count is zero returns zero and does nothing. A value of @count
142 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
144 * On success, the number of bytes read into the buffer is returned.
145 * It is not an error if this is not the same as the requested size, as it
146 * can happen e.g. near the end of a file. Zero is returned on end of file
147 * (or if @count is zero), but never otherwise.
149 * The returned @buffer is not a nul-terminated string, it can contain nul bytes
150 * at any position, and this function doesn't nul-terminate the @buffer.
152 * If @cancellable is not %NULL, then the operation can be cancelled by
153 * triggering the cancellable object from another thread. If the operation
154 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
155 * operation was partially finished when the operation was cancelled the
156 * partial result will be returned, without an error.
158 * On error -1 is returned and @error is set accordingly.
160 * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
163 g_input_stream_read (GInputStream *stream,
166 GCancellable *cancellable,
169 GInputStreamClass *class;
172 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
173 g_return_val_if_fail (buffer != NULL, 0);
178 if (((gssize) count) < 0)
180 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
181 _("Too large count value passed to %s"), G_STRFUNC);
185 class = G_INPUT_STREAM_GET_CLASS (stream);
187 if (class->read_fn == NULL)
189 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
190 _("Input stream doesn’t implement read"));
194 if (!g_input_stream_set_pending (stream, error))
198 g_cancellable_push_current (cancellable);
200 res = class->read_fn (stream, buffer, count, cancellable, error);
203 g_cancellable_pop_current (cancellable);
205 g_input_stream_clear_pending (stream);
211 * g_input_stream_read_all:
212 * @stream: a #GInputStream.
213 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
214 * a buffer to read data into (which should be at least count bytes long).
215 * @count: (in): the number of bytes that will be read from the stream
216 * @bytes_read: (out): location to store the number of bytes that was read from the stream
217 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
218 * @error: location to store the error occurring, or %NULL to ignore
220 * Tries to read @count bytes from the stream into the buffer starting at
221 * @buffer. Will block during this read.
223 * This function is similar to g_input_stream_read(), except it tries to
224 * read as many bytes as requested, only stopping on an error or end of stream.
226 * On a successful read of @count bytes, or if we reached the end of the
227 * stream, %TRUE is returned, and @bytes_read is set to the number of bytes
230 * If there is an error during the operation %FALSE is returned and @error
231 * is set to indicate the error status.
233 * As a special exception to the normal conventions for functions that
234 * use #GError, if this function returns %FALSE (and sets @error) then
235 * @bytes_read will be set to the number of bytes that were successfully
236 * read before the error was encountered. This functionality is only
237 * available from C. If you need it from another language then you must
238 * write your own loop around g_input_stream_read().
240 * Returns: %TRUE on success, %FALSE if there was an error
243 g_input_stream_read_all (GInputStream *stream,
247 GCancellable *cancellable,
253 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
254 g_return_val_if_fail (buffer != NULL, FALSE);
257 while (_bytes_read < count)
259 res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
264 *bytes_read = _bytes_read;
275 *bytes_read = _bytes_read;
280 * g_input_stream_read_bytes:
281 * @stream: a #GInputStream.
282 * @count: maximum number of bytes that will be read from the stream. Common
283 * values include 4096 and 8192.
284 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
285 * @error: location to store the error occurring, or %NULL to ignore
287 * Like g_input_stream_read(), this tries to read @count bytes from
288 * the stream in a blocking fashion. However, rather than reading into
289 * a user-supplied buffer, this will create a new #GBytes containing
290 * the data that was read. This may be easier to use from language
293 * If count is zero, returns a zero-length #GBytes and does nothing. A
294 * value of @count larger than %G_MAXSSIZE will cause a
295 * %G_IO_ERROR_INVALID_ARGUMENT error.
297 * On success, a new #GBytes is returned. It is not an error if the
298 * size of this object is not the same as the requested size, as it
299 * can happen e.g. near the end of a file. A zero-length #GBytes is
300 * returned on end of file (or if @count is zero), but never
303 * If @cancellable is not %NULL, then the operation can be cancelled by
304 * triggering the cancellable object from another thread. If the operation
305 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
306 * operation was partially finished when the operation was cancelled the
307 * partial result will be returned, without an error.
309 * On error %NULL is returned and @error is set accordingly.
311 * Returns: (transfer full): a new #GBytes, or %NULL on error
316 g_input_stream_read_bytes (GInputStream *stream,
318 GCancellable *cancellable,
324 buf = g_malloc (count);
325 nread = g_input_stream_read (stream, buf, count, cancellable, error);
334 return g_bytes_new_static ("", 0);
337 return g_bytes_new_take (buf, nread);
341 * g_input_stream_skip:
342 * @stream: a #GInputStream.
343 * @count: the number of bytes that will be skipped from the stream
344 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
345 * @error: location to store the error occurring, or %NULL to ignore
347 * Tries to skip @count bytes from the stream. Will block during the operation.
349 * This is identical to g_input_stream_read(), from a behaviour standpoint,
350 * but the bytes that are skipped are not returned to the user. Some
351 * streams have an implementation that is more efficient than reading the data.
353 * This function is optional for inherited classes, as the default implementation
354 * emulates it using read.
356 * If @cancellable is not %NULL, then the operation can be cancelled by
357 * triggering the cancellable object from another thread. If the operation
358 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
359 * operation was partially finished when the operation was cancelled the
360 * partial result will be returned, without an error.
362 * Returns: Number of bytes skipped, or -1 on error
365 g_input_stream_skip (GInputStream *stream,
367 GCancellable *cancellable,
370 GInputStreamClass *class;
373 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
378 if (((gssize) count) < 0)
380 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
381 _("Too large count value passed to %s"), G_STRFUNC);
385 class = G_INPUT_STREAM_GET_CLASS (stream);
387 if (!g_input_stream_set_pending (stream, error))
391 g_cancellable_push_current (cancellable);
393 res = class->skip (stream, count, cancellable, error);
396 g_cancellable_pop_current (cancellable);
398 g_input_stream_clear_pending (stream);
404 g_input_stream_real_skip (GInputStream *stream,
406 GCancellable *cancellable,
409 GInputStreamClass *class;
410 gssize ret, read_bytes;
414 if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
416 GSeekable *seekable = G_SEEKABLE (stream);
420 /* g_seekable_seek() may try to set pending itself */
421 stream->priv->pending = FALSE;
423 start = g_seekable_tell (seekable);
425 if (g_seekable_seek (G_SEEKABLE (stream),
431 end = g_seekable_tell (seekable);
432 g_assert (start >= 0);
433 g_assert (end >= start);
434 if (start > (goffset) (G_MAXOFFSET - count) ||
435 (goffset) (start + count) > end)
437 stream->priv->pending = TRUE;
441 success = g_seekable_seek (G_SEEKABLE (stream),
446 stream->priv->pending = TRUE;
455 /* If not seekable, or seek failed, fall back to reading data: */
457 class = G_INPUT_STREAM_GET_CLASS (stream);
464 ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
465 cancellable, &my_error);
468 if (read_bytes > 0 &&
469 my_error->domain == G_IO_ERROR &&
470 my_error->code == G_IO_ERROR_CANCELLED)
472 g_error_free (my_error);
476 g_propagate_error (error, my_error);
483 if (ret == 0 || count == 0)
489 * g_input_stream_close:
490 * @stream: A #GInputStream.
491 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
492 * @error: location to store the error occurring, or %NULL to ignore
494 * Closes the stream, releasing resources related to it.
496 * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
497 * Closing a stream multiple times will not return an error.
499 * Streams will be automatically closed when the last reference
500 * is dropped, but you might want to call this function to make sure
501 * resources are released as early as possible.
503 * Some streams might keep the backing store of the stream (e.g. a file descriptor)
504 * open after the stream is closed. See the documentation for the individual
505 * stream for details.
507 * On failure the first error that happened will be reported, but the close
508 * operation will finish as much as possible. A stream that failed to
509 * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
510 * is important to check and report the error to the user.
512 * If @cancellable is not %NULL, then the operation can be cancelled by
513 * triggering the cancellable object from another thread. If the operation
514 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
515 * Cancelling a close will still leave the stream closed, but some streams
516 * can use a faster close that doesn't block to e.g. check errors.
518 * Returns: %TRUE on success, %FALSE on failure
521 g_input_stream_close (GInputStream *stream,
522 GCancellable *cancellable,
525 GInputStreamClass *class;
528 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
530 class = G_INPUT_STREAM_GET_CLASS (stream);
532 if (stream->priv->closed)
537 if (!g_input_stream_set_pending (stream, error))
541 g_cancellable_push_current (cancellable);
544 res = class->close_fn (stream, cancellable, error);
547 g_cancellable_pop_current (cancellable);
549 g_input_stream_clear_pending (stream);
551 stream->priv->closed = TRUE;
557 async_ready_callback_wrapper (GObject *source_object,
561 GInputStream *stream = G_INPUT_STREAM (source_object);
563 g_input_stream_clear_pending (stream);
564 if (stream->priv->outstanding_callback)
565 (*stream->priv->outstanding_callback) (source_object, res, user_data);
566 g_object_unref (stream);
570 async_ready_close_callback_wrapper (GObject *source_object,
574 GInputStream *stream = G_INPUT_STREAM (source_object);
576 g_input_stream_clear_pending (stream);
577 stream->priv->closed = TRUE;
578 if (stream->priv->outstanding_callback)
579 (*stream->priv->outstanding_callback) (source_object, res, user_data);
580 g_object_unref (stream);
584 * g_input_stream_read_async:
585 * @stream: A #GInputStream.
586 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
587 * a buffer to read data into (which should be at least count bytes long).
588 * @count: (in): the number of bytes that will be read from the stream
589 * @io_priority: the [I/O priority][io-priority]
591 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
592 * @callback: (scope async): a #GAsyncReadyCallback
593 * to call when the request is satisfied
594 * @user_data: the data to pass to callback function
596 * Request an asynchronous read of @count bytes from the stream into the buffer
597 * starting at @buffer. When the operation is finished @callback will be called.
598 * You can then call g_input_stream_read_finish() to get the result of the
601 * During an async request no other sync and async calls are allowed on @stream, and will
602 * result in %G_IO_ERROR_PENDING errors.
604 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
606 * On success, the number of bytes read into the buffer will be passed to the
607 * callback. It is not an error if this is not the same as the requested size, as it
608 * can happen e.g. near the end of a file, but generally we try to read
609 * as many bytes as requested. Zero is returned on end of file
610 * (or if @count is zero), but never otherwise.
612 * Any outstanding i/o request with higher priority (lower numerical value) will
613 * be executed before an outstanding request with lower priority. Default
614 * priority is %G_PRIORITY_DEFAULT.
616 * The asynchronous methods have a default fallback that uses threads to implement
617 * asynchronicity, so they are optional for inheriting classes. However, if you
618 * override one you must override all.
621 g_input_stream_read_async (GInputStream *stream,
625 GCancellable *cancellable,
626 GAsyncReadyCallback callback,
629 GInputStreamClass *class;
630 GError *error = NULL;
632 g_return_if_fail (G_IS_INPUT_STREAM (stream));
633 g_return_if_fail (buffer != NULL);
639 task = g_task_new (stream, cancellable, callback, user_data);
640 g_task_set_source_tag (task, g_input_stream_read_async);
641 g_task_return_int (task, 0);
642 g_object_unref (task);
646 if (((gssize) count) < 0)
648 g_task_report_new_error (stream, callback, user_data,
649 g_input_stream_read_async,
650 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
651 _("Too large count value passed to %s"),
656 if (!g_input_stream_set_pending (stream, &error))
658 g_task_report_error (stream, callback, user_data,
659 g_input_stream_read_async,
664 class = G_INPUT_STREAM_GET_CLASS (stream);
665 stream->priv->outstanding_callback = callback;
666 g_object_ref (stream);
667 class->read_async (stream, buffer, count, io_priority, cancellable,
668 async_ready_callback_wrapper, user_data);
672 * g_input_stream_read_finish:
673 * @stream: a #GInputStream.
674 * @result: a #GAsyncResult.
675 * @error: a #GError location to store the error occurring, or %NULL to
678 * Finishes an asynchronous stream read operation.
680 * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
683 g_input_stream_read_finish (GInputStream *stream,
684 GAsyncResult *result,
687 GInputStreamClass *class;
689 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
690 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
692 if (g_async_result_legacy_propagate_error (result, error))
694 else if (g_async_result_is_tagged (result, g_input_stream_read_async))
695 return g_task_propagate_int (G_TASK (result), error);
697 class = G_INPUT_STREAM_GET_CLASS (stream);
698 return class->read_finish (stream, result, error);
709 free_async_read_all (gpointer data)
711 g_slice_free (AsyncReadAll, data);
715 read_all_callback (GObject *stream,
716 GAsyncResult *result,
719 GTask *task = user_data;
720 AsyncReadAll *data = g_task_get_task_data (task);
721 gboolean got_eof = FALSE;
725 GError *error = NULL;
728 nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
732 g_task_return_error (task, error);
733 g_object_unref (task);
737 g_assert_cmpint (nread, <=, data->to_read);
738 data->to_read -= nread;
739 data->bytes_read += nread;
740 got_eof = (nread == 0);
743 if (got_eof || data->to_read == 0)
745 g_task_return_boolean (task, TRUE);
746 g_object_unref (task);
750 g_input_stream_read_async (G_INPUT_STREAM (stream),
751 data->buffer + data->bytes_read,
753 g_task_get_priority (task),
754 g_task_get_cancellable (task),
755 read_all_callback, task);
760 read_all_async_thread (GTask *task,
761 gpointer source_object,
763 GCancellable *cancellable)
765 GInputStream *stream = source_object;
766 AsyncReadAll *data = task_data;
767 GError *error = NULL;
769 if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
770 g_task_get_cancellable (task), &error))
771 g_task_return_boolean (task, TRUE);
773 g_task_return_error (task, error);
777 * g_input_stream_read_all_async:
778 * @stream: A #GInputStream
779 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
780 * a buffer to read data into (which should be at least count bytes long)
781 * @count: (in): the number of bytes that will be read from the stream
782 * @io_priority: the [I/O priority][io-priority] of the request
783 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
784 * @callback: (scope async): a #GAsyncReadyCallback
785 * to call when the request is satisfied
786 * @user_data: the data to pass to callback function
788 * Request an asynchronous read of @count bytes from the stream into the
789 * buffer starting at @buffer.
791 * This is the asynchronous equivalent of g_input_stream_read_all().
793 * Call g_input_stream_read_all_finish() to collect the result.
795 * Any outstanding I/O request with higher priority (lower numerical
796 * value) will be executed before an outstanding request with lower
797 * priority. Default priority is %G_PRIORITY_DEFAULT.
802 g_input_stream_read_all_async (GInputStream *stream,
806 GCancellable *cancellable,
807 GAsyncReadyCallback callback,
813 g_return_if_fail (G_IS_INPUT_STREAM (stream));
814 g_return_if_fail (buffer != NULL || count == 0);
816 task = g_task_new (stream, cancellable, callback, user_data);
817 data = g_slice_new0 (AsyncReadAll);
818 data->buffer = buffer;
819 data->to_read = count;
821 g_task_set_source_tag (task, g_input_stream_read_all_async);
822 g_task_set_task_data (task, data, free_async_read_all);
823 g_task_set_priority (task, io_priority);
825 /* If async reads are going to be handled via the threadpool anyway
826 * then we may as well do it with a single dispatch instead of
827 * bouncing in and out.
829 if (g_input_stream_async_read_is_via_threads (stream))
831 g_task_run_in_thread (task, read_all_async_thread);
832 g_object_unref (task);
835 read_all_callback (G_OBJECT (stream), NULL, task);
839 * g_input_stream_read_all_finish:
840 * @stream: a #GInputStream
841 * @result: a #GAsyncResult
842 * @bytes_read: (out): location to store the number of bytes that was read from the stream
843 * @error: a #GError location to store the error occurring, or %NULL to ignore
845 * Finishes an asynchronous stream read operation started with
846 * g_input_stream_read_all_async().
848 * As a special exception to the normal conventions for functions that
849 * use #GError, if this function returns %FALSE (and sets @error) then
850 * @bytes_read will be set to the number of bytes that were successfully
851 * read before the error was encountered. This functionality is only
852 * available from C. If you need it from another language then you must
853 * write your own loop around g_input_stream_read_async().
855 * Returns: %TRUE on success, %FALSE if there was an error
860 g_input_stream_read_all_finish (GInputStream *stream,
861 GAsyncResult *result,
867 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
868 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
870 task = G_TASK (result);
874 AsyncReadAll *data = g_task_get_task_data (task);
876 *bytes_read = data->bytes_read;
879 return g_task_propagate_boolean (task, error);
883 read_bytes_callback (GObject *stream,
884 GAsyncResult *result,
887 GTask *task = user_data;
888 guchar *buf = g_task_get_task_data (task);
889 GError *error = NULL;
891 GBytes *bytes = NULL;
893 nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
898 g_task_return_error (task, error);
903 bytes = g_bytes_new_static ("", 0);
906 bytes = g_bytes_new_take (buf, nread);
909 g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
911 g_object_unref (task);
915 * g_input_stream_read_bytes_async:
916 * @stream: A #GInputStream.
917 * @count: the number of bytes that will be read from the stream
918 * @io_priority: the [I/O priority][io-priority] of the request
919 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
920 * @callback: (scope async): a #GAsyncReadyCallback
921 * to call when the request is satisfied
922 * @user_data: the data to pass to callback function
924 * Request an asynchronous read of @count bytes from the stream into a
925 * new #GBytes. When the operation is finished @callback will be
926 * called. You can then call g_input_stream_read_bytes_finish() to get the
927 * result of the operation.
929 * During an async request no other sync and async calls are allowed
930 * on @stream, and will result in %G_IO_ERROR_PENDING errors.
932 * A value of @count larger than %G_MAXSSIZE will cause a
933 * %G_IO_ERROR_INVALID_ARGUMENT error.
935 * On success, the new #GBytes will be passed to the callback. It is
936 * not an error if this is smaller than the requested size, as it can
937 * happen e.g. near the end of a file, but generally we try to read as
938 * many bytes as requested. Zero is returned on end of file (or if
939 * @count is zero), but never otherwise.
941 * Any outstanding I/O request with higher priority (lower numerical
942 * value) will be executed before an outstanding request with lower
943 * priority. Default priority is %G_PRIORITY_DEFAULT.
948 g_input_stream_read_bytes_async (GInputStream *stream,
951 GCancellable *cancellable,
952 GAsyncReadyCallback callback,
958 task = g_task_new (stream, cancellable, callback, user_data);
959 g_task_set_source_tag (task, g_input_stream_read_bytes_async);
961 buf = g_malloc (count);
962 g_task_set_task_data (task, buf, NULL);
964 g_input_stream_read_async (stream, buf, count,
965 io_priority, cancellable,
966 read_bytes_callback, task);
970 * g_input_stream_read_bytes_finish:
971 * @stream: a #GInputStream.
972 * @result: a #GAsyncResult.
973 * @error: a #GError location to store the error occurring, or %NULL to
976 * Finishes an asynchronous stream read-into-#GBytes operation.
978 * Returns: (transfer full): the newly-allocated #GBytes, or %NULL on error
983 g_input_stream_read_bytes_finish (GInputStream *stream,
984 GAsyncResult *result,
987 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
988 g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
990 return g_task_propagate_pointer (G_TASK (result), error);
994 * g_input_stream_skip_async:
995 * @stream: A #GInputStream.
996 * @count: the number of bytes that will be skipped from the stream
997 * @io_priority: the [I/O priority][io-priority] of the request
998 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
999 * @callback: (scope async): a #GAsyncReadyCallback
1000 * to call when the request is satisfied
1001 * @user_data: the data to pass to callback function
1003 * Request an asynchronous skip of @count bytes from the stream.
1004 * When the operation is finished @callback will be called.
1005 * You can then call g_input_stream_skip_finish() to get the result
1008 * During an async request no other sync and async calls are allowed,
1009 * and will result in %G_IO_ERROR_PENDING errors.
1011 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
1013 * On success, the number of bytes skipped will be passed to the callback.
1014 * It is not an error if this is not the same as the requested size, as it
1015 * can happen e.g. near the end of a file, but generally we try to skip
1016 * as many bytes as requested. Zero is returned on end of file
1017 * (or if @count is zero), but never otherwise.
1019 * Any outstanding i/o request with higher priority (lower numerical value)
1020 * will be executed before an outstanding request with lower priority.
1021 * Default priority is %G_PRIORITY_DEFAULT.
1023 * The asynchronous methods have a default fallback that uses threads to
1024 * implement asynchronicity, so they are optional for inheriting classes.
1025 * However, if you override one, you must override all.
1028 g_input_stream_skip_async (GInputStream *stream,
1031 GCancellable *cancellable,
1032 GAsyncReadyCallback callback,
1035 GInputStreamClass *class;
1036 GError *error = NULL;
1038 g_return_if_fail (G_IS_INPUT_STREAM (stream));
1044 task = g_task_new (stream, cancellable, callback, user_data);
1045 g_task_set_source_tag (task, g_input_stream_skip_async);
1046 g_task_return_int (task, 0);
1047 g_object_unref (task);
1051 if (((gssize) count) < 0)
1053 g_task_report_new_error (stream, callback, user_data,
1054 g_input_stream_skip_async,
1055 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1056 _("Too large count value passed to %s"),
1061 if (!g_input_stream_set_pending (stream, &error))
1063 g_task_report_error (stream, callback, user_data,
1064 g_input_stream_skip_async,
1069 class = G_INPUT_STREAM_GET_CLASS (stream);
1070 stream->priv->outstanding_callback = callback;
1071 g_object_ref (stream);
1072 class->skip_async (stream, count, io_priority, cancellable,
1073 async_ready_callback_wrapper, user_data);
1077 * g_input_stream_skip_finish:
1078 * @stream: a #GInputStream.
1079 * @result: a #GAsyncResult.
1080 * @error: a #GError location to store the error occurring, or %NULL to
1083 * Finishes a stream skip operation.
1085 * Returns: the size of the bytes skipped, or `-1` on error.
1088 g_input_stream_skip_finish (GInputStream *stream,
1089 GAsyncResult *result,
1092 GInputStreamClass *class;
1094 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
1095 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1097 if (g_async_result_legacy_propagate_error (result, error))
1099 else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
1100 return g_task_propagate_int (G_TASK (result), error);
1102 class = G_INPUT_STREAM_GET_CLASS (stream);
1103 return class->skip_finish (stream, result, error);
1107 * g_input_stream_close_async:
1108 * @stream: A #GInputStream.
1109 * @io_priority: the [I/O priority][io-priority] of the request
1110 * @cancellable: (nullable): optional cancellable object
1111 * @callback: (scope async): a #GAsyncReadyCallback
1112 * to call when the request is satisfied
1113 * @user_data: the data to pass to callback function
1115 * Requests an asynchronous closes of the stream, releasing resources related to it.
1116 * When the operation is finished @callback will be called.
1117 * You can then call g_input_stream_close_finish() to get the result of the
1120 * For behaviour details see g_input_stream_close().
1122 * The asynchronous methods have a default fallback that uses threads to implement
1123 * asynchronicity, so they are optional for inheriting classes. However, if you
1124 * override one you must override all.
1127 g_input_stream_close_async (GInputStream *stream,
1129 GCancellable *cancellable,
1130 GAsyncReadyCallback callback,
1133 GInputStreamClass *class;
1134 GError *error = NULL;
1136 g_return_if_fail (G_IS_INPUT_STREAM (stream));
1138 if (stream->priv->closed)
1142 task = g_task_new (stream, cancellable, callback, user_data);
1143 g_task_set_source_tag (task, g_input_stream_close_async);
1144 g_task_return_boolean (task, TRUE);
1145 g_object_unref (task);
1149 if (!g_input_stream_set_pending (stream, &error))
1151 g_task_report_error (stream, callback, user_data,
1152 g_input_stream_close_async,
1157 class = G_INPUT_STREAM_GET_CLASS (stream);
1158 stream->priv->outstanding_callback = callback;
1159 g_object_ref (stream);
1160 class->close_async (stream, io_priority, cancellable,
1161 async_ready_close_callback_wrapper, user_data);
1165 * g_input_stream_close_finish:
1166 * @stream: a #GInputStream.
1167 * @result: a #GAsyncResult.
1168 * @error: a #GError location to store the error occurring, or %NULL to
1171 * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
1173 * Returns: %TRUE if the stream was closed successfully.
1176 g_input_stream_close_finish (GInputStream *stream,
1177 GAsyncResult *result,
1180 GInputStreamClass *class;
1182 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1183 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1185 if (g_async_result_legacy_propagate_error (result, error))
1187 else if (g_async_result_is_tagged (result, g_input_stream_close_async))
1188 return g_task_propagate_boolean (G_TASK (result), error);
1190 class = G_INPUT_STREAM_GET_CLASS (stream);
1191 return class->close_finish (stream, result, error);
1195 * g_input_stream_is_closed:
1196 * @stream: input stream.
1198 * Checks if an input stream is closed.
1200 * Returns: %TRUE if the stream is closed.
1203 g_input_stream_is_closed (GInputStream *stream)
1205 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1207 return stream->priv->closed;
1211 * g_input_stream_has_pending:
1212 * @stream: input stream.
1214 * Checks if an input stream has pending actions.
1216 * Returns: %TRUE if @stream has pending actions.
1219 g_input_stream_has_pending (GInputStream *stream)
1221 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1223 return stream->priv->pending;
1227 * g_input_stream_set_pending:
1228 * @stream: input stream
1229 * @error: a #GError location to store the error occurring, or %NULL to
1232 * Sets @stream to have actions pending. If the pending flag is
1233 * already set or @stream is closed, it will return %FALSE and set
1236 * Returns: %TRUE if pending was previously unset and is now set.
1239 g_input_stream_set_pending (GInputStream *stream, GError **error)
1241 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1243 if (stream->priv->closed)
1245 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1246 _("Stream is already closed"));
1250 if (stream->priv->pending)
1252 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1253 /* Translators: This is an error you get if there is already an
1254 * operation running against this stream when you try to start
1256 _("Stream has outstanding operation"));
1260 stream->priv->pending = TRUE;
1265 * g_input_stream_clear_pending:
1266 * @stream: input stream
1268 * Clears the pending flag on @stream.
1271 g_input_stream_clear_pending (GInputStream *stream)
1273 g_return_if_fail (G_IS_INPUT_STREAM (stream));
1275 stream->priv->pending = FALSE;
1279 * g_input_stream_async_read_is_via_threads:
1280 * @stream: input stream
1282 * Checks if an input stream's read_async function uses threads.
1284 * Returns: %TRUE if @stream's read_async function uses threads.
1287 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1289 GInputStreamClass *class;
1291 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1293 class = G_INPUT_STREAM_GET_CLASS (stream);
1295 return (class->read_async == g_input_stream_real_read_async &&
1296 !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1297 g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1301 * g_input_stream_async_close_is_via_threads:
1302 * @stream: input stream
1304 * Checks if an input stream's close_async function uses threads.
1306 * Returns: %TRUE if @stream's close_async function uses threads.
1309 g_input_stream_async_close_is_via_threads (GInputStream *stream)
1311 GInputStreamClass *class;
1313 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1315 class = G_INPUT_STREAM_GET_CLASS (stream);
1317 return class->close_async == g_input_stream_real_close_async;
1320 /********************************************
1321 * Default implementation of async ops *
1322 ********************************************/
1330 free_read_data (ReadData *op)
1332 g_slice_free (ReadData, op);
1336 read_async_thread (GTask *task,
1337 gpointer source_object,
1339 GCancellable *cancellable)
1341 GInputStream *stream = source_object;
1342 ReadData *op = task_data;
1343 GInputStreamClass *class;
1344 GError *error = NULL;
1347 class = G_INPUT_STREAM_GET_CLASS (stream);
1349 nread = class->read_fn (stream,
1350 op->buffer, op->count,
1351 g_task_get_cancellable (task),
1354 g_task_return_error (task, error);
1356 g_task_return_int (task, nread);
1359 static void read_async_pollable (GPollableInputStream *stream,
1363 read_async_pollable_ready (GPollableInputStream *stream,
1366 GTask *task = user_data;
1368 read_async_pollable (stream, task);
1373 read_async_pollable (GPollableInputStream *stream,
1376 ReadData *op = g_task_get_task_data (task);
1377 GError *error = NULL;
1380 if (g_task_return_error_if_cancelled (task))
1383 nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1384 read_nonblocking (stream, op->buffer, op->count, &error);
1386 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1390 g_error_free (error);
1392 source = g_pollable_input_stream_create_source (stream,
1393 g_task_get_cancellable (task));
1394 g_task_attach_source (task, source,
1395 (GSourceFunc) read_async_pollable_ready);
1396 g_source_unref (source);
1401 g_task_return_error (task, error);
1403 g_task_return_int (task, nread);
1404 /* g_input_stream_real_read_async() unrefs task */
1409 g_input_stream_real_read_async (GInputStream *stream,
1413 GCancellable *cancellable,
1414 GAsyncReadyCallback callback,
1420 op = g_slice_new0 (ReadData);
1421 task = g_task_new (stream, cancellable, callback, user_data);
1422 g_task_set_source_tag (task, g_input_stream_real_read_async);
1423 g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1424 g_task_set_priority (task, io_priority);
1425 op->buffer = buffer;
1428 if (!g_input_stream_async_read_is_via_threads (stream))
1429 read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1431 g_task_run_in_thread (task, read_async_thread);
1432 g_object_unref (task);
1436 g_input_stream_real_read_finish (GInputStream *stream,
1437 GAsyncResult *result,
1440 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1442 return g_task_propagate_int (G_TASK (result), error);
1447 skip_async_thread (GTask *task,
1448 gpointer source_object,
1450 GCancellable *cancellable)
1452 GInputStream *stream = source_object;
1453 gsize count = GPOINTER_TO_SIZE (task_data);
1454 GInputStreamClass *class;
1455 GError *error = NULL;
1458 class = G_INPUT_STREAM_GET_CLASS (stream);
1459 ret = class->skip (stream, count,
1460 g_task_get_cancellable (task),
1463 g_task_return_error (task, error);
1465 g_task_return_int (task, ret);
1471 gsize count_skipped;
1472 } SkipFallbackAsyncData;
1475 skip_callback_wrapper (GObject *source_object,
1479 GInputStreamClass *class;
1480 GTask *task = user_data;
1481 SkipFallbackAsyncData *data = g_task_get_task_data (task);
1482 GError *error = NULL;
1485 ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1490 data->count_skipped += ret;
1492 if (data->count > 0)
1494 class = G_INPUT_STREAM_GET_CLASS (source_object);
1495 class->read_async (G_INPUT_STREAM (source_object),
1496 data->buffer, MIN (8192, data->count),
1497 g_task_get_priority (task),
1498 g_task_get_cancellable (task),
1499 skip_callback_wrapper, task);
1505 g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1506 data->count_skipped)
1508 /* No error, return partial read */
1509 g_clear_error (&error);
1513 g_task_return_error (task, error);
1515 g_task_return_int (task, data->count_skipped);
1516 g_object_unref (task);
1520 g_input_stream_real_skip_async (GInputStream *stream,
1523 GCancellable *cancellable,
1524 GAsyncReadyCallback callback,
1527 GInputStreamClass *class;
1528 SkipFallbackAsyncData *data;
1531 class = G_INPUT_STREAM_GET_CLASS (stream);
1533 task = g_task_new (stream, cancellable, callback, user_data);
1534 g_task_set_source_tag (task, g_input_stream_real_skip_async);
1535 g_task_set_priority (task, io_priority);
1537 if (g_input_stream_async_read_is_via_threads (stream))
1539 /* Read is thread-using async fallback.
1540 * Make skip use threads too, so that we can use a possible sync skip
1541 * implementation. */
1542 g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1544 g_task_run_in_thread (task, skip_async_thread);
1545 g_object_unref (task);
1549 /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1551 /* There is a custom async read function, lets use that. */
1552 data = g_new (SkipFallbackAsyncData, 1);
1553 data->count = count;
1554 data->count_skipped = 0;
1555 g_task_set_task_data (task, data, g_free);
1556 g_task_set_check_cancellable (task, FALSE);
1557 class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1558 skip_callback_wrapper, task);
1564 g_input_stream_real_skip_finish (GInputStream *stream,
1565 GAsyncResult *result,
1568 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1570 return g_task_propagate_int (G_TASK (result), error);
1574 close_async_thread (GTask *task,
1575 gpointer source_object,
1577 GCancellable *cancellable)
1579 GInputStream *stream = source_object;
1580 GInputStreamClass *class;
1581 GError *error = NULL;
1584 class = G_INPUT_STREAM_GET_CLASS (stream);
1585 if (class->close_fn)
1587 result = class->close_fn (stream,
1588 g_task_get_cancellable (task),
1592 g_task_return_error (task, error);
1597 g_task_return_boolean (task, TRUE);
1601 g_input_stream_real_close_async (GInputStream *stream,
1603 GCancellable *cancellable,
1604 GAsyncReadyCallback callback,
1609 task = g_task_new (stream, cancellable, callback, user_data);
1610 g_task_set_source_tag (task, g_input_stream_real_close_async);
1611 g_task_set_check_cancellable (task, FALSE);
1612 g_task_set_priority (task, io_priority);
1614 g_task_run_in_thread (task, close_async_thread);
1615 g_object_unref (task);
1619 g_input_stream_real_close_finish (GInputStream *stream,
1620 GAsyncResult *result,
1623 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1625 return g_task_propagate_boolean (G_TASK (result), error);