1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
18 * Author: Alexander Larsson <alexl@redhat.com>
25 #include "ginputstream.h"
26 #include "gioprivate.h"
27 #include "gseekable.h"
28 #include "gcancellable.h"
29 #include "gasyncresult.h"
31 #include "gpollableinputstream.h"
34 * SECTION:ginputstream
35 * @short_description: Base class for implementing streaming input
38 * #GInputStream has functions to read from a stream (g_input_stream_read()),
39 * to close a stream (g_input_stream_close()) and to skip some content
40 * (g_input_stream_skip()).
42 * To copy the content of an input stream to an output stream without
43 * manually handling the reads and writes, use g_output_stream_splice().
45 * See the documentation for #GIOStream for details of thread safety of
48 * All of these functions have async variants too.
51 struct _GInputStreamPrivate {
54 GAsyncReadyCallback outstanding_callback;
57 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
59 static gssize g_input_stream_real_skip (GInputStream *stream,
61 GCancellable *cancellable,
63 static void g_input_stream_real_read_async (GInputStream *stream,
67 GCancellable *cancellable,
68 GAsyncReadyCallback callback,
70 static gssize g_input_stream_real_read_finish (GInputStream *stream,
73 static void g_input_stream_real_skip_async (GInputStream *stream,
76 GCancellable *cancellable,
77 GAsyncReadyCallback callback,
79 static gssize g_input_stream_real_skip_finish (GInputStream *stream,
82 static void g_input_stream_real_close_async (GInputStream *stream,
84 GCancellable *cancellable,
85 GAsyncReadyCallback callback,
87 static gboolean g_input_stream_real_close_finish (GInputStream *stream,
92 g_input_stream_dispose (GObject *object)
96 stream = G_INPUT_STREAM (object);
98 if (!stream->priv->closed)
99 g_input_stream_close (stream, NULL, NULL);
101 G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
106 g_input_stream_class_init (GInputStreamClass *klass)
108 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
110 gobject_class->dispose = g_input_stream_dispose;
112 klass->skip = g_input_stream_real_skip;
113 klass->read_async = g_input_stream_real_read_async;
114 klass->read_finish = g_input_stream_real_read_finish;
115 klass->skip_async = g_input_stream_real_skip_async;
116 klass->skip_finish = g_input_stream_real_skip_finish;
117 klass->close_async = g_input_stream_real_close_async;
118 klass->close_finish = g_input_stream_real_close_finish;
122 g_input_stream_init (GInputStream *stream)
124 stream->priv = g_input_stream_get_instance_private (stream);
128 * g_input_stream_read:
129 * @stream: a #GInputStream.
130 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
131 * a buffer to read data into (which should be at least count bytes long).
132 * @count: (in): the number of bytes that will be read from the stream
133 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
134 * @error: location to store the error occurring, or %NULL to ignore
136 * Tries to read @count bytes from the stream into the buffer starting at
137 * @buffer. Will block during this read.
139 * If count is zero returns zero and does nothing. A value of @count
140 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
142 * On success, the number of bytes read into the buffer is returned.
143 * It is not an error if this is not the same as the requested size, as it
144 * can happen e.g. near the end of a file. Zero is returned on end of file
145 * (or if @count is zero), but never otherwise.
147 * The returned @buffer is not a nul-terminated string, it can contain nul bytes
148 * at any position, and this function doesn't nul-terminate the @buffer.
150 * If @cancellable is not %NULL, then the operation can be cancelled by
151 * triggering the cancellable object from another thread. If the operation
152 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
153 * operation was partially finished when the operation was cancelled the
154 * partial result will be returned, without an error.
156 * On error -1 is returned and @error is set accordingly.
158 * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
161 g_input_stream_read (GInputStream *stream,
164 GCancellable *cancellable,
167 GInputStreamClass *class;
170 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
171 g_return_val_if_fail (buffer != NULL, 0);
176 if (((gssize) count) < 0)
178 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
179 _("Too large count value passed to %s"), G_STRFUNC);
183 class = G_INPUT_STREAM_GET_CLASS (stream);
185 if (class->read_fn == NULL)
187 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
188 _("Input stream doesn’t implement read"));
192 if (!g_input_stream_set_pending (stream, error))
196 g_cancellable_push_current (cancellable);
198 res = class->read_fn (stream, buffer, count, cancellable, error);
201 g_cancellable_pop_current (cancellable);
203 g_input_stream_clear_pending (stream);
209 * g_input_stream_read_all:
210 * @stream: a #GInputStream.
211 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
212 * a buffer to read data into (which should be at least count bytes long).
213 * @count: (in): the number of bytes that will be read from the stream
214 * @bytes_read: (out): location to store the number of bytes that was read from the stream
215 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
216 * @error: location to store the error occurring, or %NULL to ignore
218 * Tries to read @count bytes from the stream into the buffer starting at
219 * @buffer. Will block during this read.
221 * This function is similar to g_input_stream_read(), except it tries to
222 * read as many bytes as requested, only stopping on an error or end of stream.
224 * On a successful read of @count bytes, or if we reached the end of the
225 * stream, %TRUE is returned, and @bytes_read is set to the number of bytes
228 * If there is an error during the operation %FALSE is returned and @error
229 * is set to indicate the error status.
231 * As a special exception to the normal conventions for functions that
232 * use #GError, if this function returns %FALSE (and sets @error) then
233 * @bytes_read will be set to the number of bytes that were successfully
234 * read before the error was encountered. This functionality is only
235 * available from C. If you need it from another language then you must
236 * write your own loop around g_input_stream_read().
238 * Returns: %TRUE on success, %FALSE if there was an error
241 g_input_stream_read_all (GInputStream *stream,
245 GCancellable *cancellable,
251 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
252 g_return_val_if_fail (buffer != NULL, FALSE);
255 while (_bytes_read < count)
257 res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
262 *bytes_read = _bytes_read;
273 *bytes_read = _bytes_read;
278 * g_input_stream_read_bytes:
279 * @stream: a #GInputStream.
280 * @count: maximum number of bytes that will be read from the stream. Common
281 * values include 4096 and 8192.
282 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
283 * @error: location to store the error occurring, or %NULL to ignore
285 * Like g_input_stream_read(), this tries to read @count bytes from
286 * the stream in a blocking fashion. However, rather than reading into
287 * a user-supplied buffer, this will create a new #GBytes containing
288 * the data that was read. This may be easier to use from language
291 * If count is zero, returns a zero-length #GBytes and does nothing. A
292 * value of @count larger than %G_MAXSSIZE will cause a
293 * %G_IO_ERROR_INVALID_ARGUMENT error.
295 * On success, a new #GBytes is returned. It is not an error if the
296 * size of this object is not the same as the requested size, as it
297 * can happen e.g. near the end of a file. A zero-length #GBytes is
298 * returned on end of file (or if @count is zero), but never
301 * If @cancellable is not %NULL, then the operation can be cancelled by
302 * triggering the cancellable object from another thread. If the operation
303 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
304 * operation was partially finished when the operation was cancelled the
305 * partial result will be returned, without an error.
307 * On error %NULL is returned and @error is set accordingly.
309 * Returns: (transfer full): a new #GBytes, or %NULL on error
314 g_input_stream_read_bytes (GInputStream *stream,
316 GCancellable *cancellable,
322 buf = g_malloc (count);
323 nread = g_input_stream_read (stream, buf, count, cancellable, error);
332 return g_bytes_new_static ("", 0);
335 return g_bytes_new_take (buf, nread);
339 * g_input_stream_skip:
340 * @stream: a #GInputStream.
341 * @count: the number of bytes that will be skipped from the stream
342 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
343 * @error: location to store the error occurring, or %NULL to ignore
345 * Tries to skip @count bytes from the stream. Will block during the operation.
347 * This is identical to g_input_stream_read(), from a behaviour standpoint,
348 * but the bytes that are skipped are not returned to the user. Some
349 * streams have an implementation that is more efficient than reading the data.
351 * This function is optional for inherited classes, as the default implementation
352 * emulates it using read.
354 * If @cancellable is not %NULL, then the operation can be cancelled by
355 * triggering the cancellable object from another thread. If the operation
356 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
357 * operation was partially finished when the operation was cancelled the
358 * partial result will be returned, without an error.
360 * Returns: Number of bytes skipped, or -1 on error
363 g_input_stream_skip (GInputStream *stream,
365 GCancellable *cancellable,
368 GInputStreamClass *class;
371 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
376 if (((gssize) count) < 0)
378 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
379 _("Too large count value passed to %s"), G_STRFUNC);
383 class = G_INPUT_STREAM_GET_CLASS (stream);
385 if (!g_input_stream_set_pending (stream, error))
389 g_cancellable_push_current (cancellable);
391 res = class->skip (stream, count, cancellable, error);
394 g_cancellable_pop_current (cancellable);
396 g_input_stream_clear_pending (stream);
402 g_input_stream_real_skip (GInputStream *stream,
404 GCancellable *cancellable,
407 GInputStreamClass *class;
408 gssize ret, read_bytes;
412 if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
414 GSeekable *seekable = G_SEEKABLE (stream);
418 /* g_seekable_seek() may try to set pending itself */
419 stream->priv->pending = FALSE;
421 start = g_seekable_tell (seekable);
423 if (g_seekable_seek (G_SEEKABLE (stream),
429 end = g_seekable_tell (seekable);
430 g_assert (start >= 0);
431 g_assert (end >= start);
432 if (start > (goffset) (G_MAXOFFSET - count) ||
433 (goffset) (start + count) > end)
435 stream->priv->pending = TRUE;
439 success = g_seekable_seek (G_SEEKABLE (stream),
444 stream->priv->pending = TRUE;
453 /* If not seekable, or seek failed, fall back to reading data: */
455 class = G_INPUT_STREAM_GET_CLASS (stream);
462 ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
463 cancellable, &my_error);
466 if (read_bytes > 0 &&
467 my_error->domain == G_IO_ERROR &&
468 my_error->code == G_IO_ERROR_CANCELLED)
470 g_error_free (my_error);
474 g_propagate_error (error, my_error);
481 if (ret == 0 || count == 0)
487 * g_input_stream_close:
488 * @stream: A #GInputStream.
489 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
490 * @error: location to store the error occurring, or %NULL to ignore
492 * Closes the stream, releasing resources related to it.
494 * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
495 * Closing a stream multiple times will not return an error.
497 * Streams will be automatically closed when the last reference
498 * is dropped, but you might want to call this function to make sure
499 * resources are released as early as possible.
501 * Some streams might keep the backing store of the stream (e.g. a file descriptor)
502 * open after the stream is closed. See the documentation for the individual
503 * stream for details.
505 * On failure the first error that happened will be reported, but the close
506 * operation will finish as much as possible. A stream that failed to
507 * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
508 * is important to check and report the error to the user.
510 * If @cancellable is not %NULL, then the operation can be cancelled by
511 * triggering the cancellable object from another thread. If the operation
512 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
513 * Cancelling a close will still leave the stream closed, but some streams
514 * can use a faster close that doesn't block to e.g. check errors.
516 * Returns: %TRUE on success, %FALSE on failure
519 g_input_stream_close (GInputStream *stream,
520 GCancellable *cancellable,
523 GInputStreamClass *class;
526 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
528 class = G_INPUT_STREAM_GET_CLASS (stream);
530 if (stream->priv->closed)
535 if (!g_input_stream_set_pending (stream, error))
539 g_cancellable_push_current (cancellable);
542 res = class->close_fn (stream, cancellable, error);
545 g_cancellable_pop_current (cancellable);
547 g_input_stream_clear_pending (stream);
549 stream->priv->closed = TRUE;
555 async_ready_callback_wrapper (GObject *source_object,
559 GInputStream *stream = G_INPUT_STREAM (source_object);
561 g_input_stream_clear_pending (stream);
562 if (stream->priv->outstanding_callback)
563 (*stream->priv->outstanding_callback) (source_object, res, user_data);
564 g_object_unref (stream);
568 async_ready_close_callback_wrapper (GObject *source_object,
572 GInputStream *stream = G_INPUT_STREAM (source_object);
574 g_input_stream_clear_pending (stream);
575 stream->priv->closed = TRUE;
576 if (stream->priv->outstanding_callback)
577 (*stream->priv->outstanding_callback) (source_object, res, user_data);
578 g_object_unref (stream);
582 * g_input_stream_read_async:
583 * @stream: A #GInputStream.
584 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
585 * a buffer to read data into (which should be at least count bytes long).
586 * @count: (in): the number of bytes that will be read from the stream
587 * @io_priority: the [I/O priority][io-priority]
589 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
590 * @callback: (scope async): callback to call when the request is satisfied
591 * @user_data: (closure): the data to pass to callback function
593 * Request an asynchronous read of @count bytes from the stream into the buffer
594 * starting at @buffer. When the operation is finished @callback will be called.
595 * You can then call g_input_stream_read_finish() to get the result of the
598 * During an async request no other sync and async calls are allowed on @stream, and will
599 * result in %G_IO_ERROR_PENDING errors.
601 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
603 * On success, the number of bytes read into the buffer will be passed to the
604 * callback. It is not an error if this is not the same as the requested size, as it
605 * can happen e.g. near the end of a file, but generally we try to read
606 * as many bytes as requested. Zero is returned on end of file
607 * (or if @count is zero), but never otherwise.
609 * Any outstanding i/o request with higher priority (lower numerical value) will
610 * be executed before an outstanding request with lower priority. Default
611 * priority is %G_PRIORITY_DEFAULT.
613 * The asynchronous methods have a default fallback that uses threads to implement
614 * asynchronicity, so they are optional for inheriting classes. However, if you
615 * override one you must override all.
618 g_input_stream_read_async (GInputStream *stream,
622 GCancellable *cancellable,
623 GAsyncReadyCallback callback,
626 GInputStreamClass *class;
627 GError *error = NULL;
629 g_return_if_fail (G_IS_INPUT_STREAM (stream));
630 g_return_if_fail (buffer != NULL);
636 task = g_task_new (stream, cancellable, callback, user_data);
637 g_task_set_source_tag (task, g_input_stream_read_async);
638 g_task_return_int (task, 0);
639 g_object_unref (task);
643 if (((gssize) count) < 0)
645 g_task_report_new_error (stream, callback, user_data,
646 g_input_stream_read_async,
647 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
648 _("Too large count value passed to %s"),
653 if (!g_input_stream_set_pending (stream, &error))
655 g_task_report_error (stream, callback, user_data,
656 g_input_stream_read_async,
661 class = G_INPUT_STREAM_GET_CLASS (stream);
662 stream->priv->outstanding_callback = callback;
663 g_object_ref (stream);
664 class->read_async (stream, buffer, count, io_priority, cancellable,
665 async_ready_callback_wrapper, user_data);
669 * g_input_stream_read_finish:
670 * @stream: a #GInputStream.
671 * @result: a #GAsyncResult.
672 * @error: a #GError location to store the error occurring, or %NULL to
675 * Finishes an asynchronous stream read operation.
677 * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
680 g_input_stream_read_finish (GInputStream *stream,
681 GAsyncResult *result,
684 GInputStreamClass *class;
686 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
687 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
689 if (g_async_result_legacy_propagate_error (result, error))
691 else if (g_async_result_is_tagged (result, g_input_stream_read_async))
692 return g_task_propagate_int (G_TASK (result), error);
694 class = G_INPUT_STREAM_GET_CLASS (stream);
695 return class->read_finish (stream, result, error);
706 free_async_read_all (gpointer data)
708 g_slice_free (AsyncReadAll, data);
712 read_all_callback (GObject *stream,
713 GAsyncResult *result,
716 GTask *task = user_data;
717 AsyncReadAll *data = g_task_get_task_data (task);
718 gboolean got_eof = FALSE;
722 GError *error = NULL;
725 nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
729 g_task_return_error (task, error);
730 g_object_unref (task);
734 g_assert_cmpint (nread, <=, data->to_read);
735 data->to_read -= nread;
736 data->bytes_read += nread;
737 got_eof = (nread == 0);
740 if (got_eof || data->to_read == 0)
742 g_task_return_boolean (task, TRUE);
743 g_object_unref (task);
747 g_input_stream_read_async (G_INPUT_STREAM (stream),
748 data->buffer + data->bytes_read,
750 g_task_get_priority (task),
751 g_task_get_cancellable (task),
752 read_all_callback, task);
757 read_all_async_thread (GTask *task,
758 gpointer source_object,
760 GCancellable *cancellable)
762 GInputStream *stream = source_object;
763 AsyncReadAll *data = task_data;
764 GError *error = NULL;
766 if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
767 g_task_get_cancellable (task), &error))
768 g_task_return_boolean (task, TRUE);
770 g_task_return_error (task, error);
774 * g_input_stream_read_all_async:
775 * @stream: A #GInputStream
776 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
777 * a buffer to read data into (which should be at least count bytes long)
778 * @count: (in): the number of bytes that will be read from the stream
779 * @io_priority: the [I/O priority][io-priority] of the request
780 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
781 * @callback: (scope async): callback to call when the request is satisfied
782 * @user_data: (closure): the data to pass to callback function
784 * Request an asynchronous read of @count bytes from the stream into the
785 * buffer starting at @buffer.
787 * This is the asynchronous equivalent of g_input_stream_read_all().
789 * Call g_input_stream_read_all_finish() to collect the result.
791 * Any outstanding I/O request with higher priority (lower numerical
792 * value) will be executed before an outstanding request with lower
793 * priority. Default priority is %G_PRIORITY_DEFAULT.
798 g_input_stream_read_all_async (GInputStream *stream,
802 GCancellable *cancellable,
803 GAsyncReadyCallback callback,
809 g_return_if_fail (G_IS_INPUT_STREAM (stream));
810 g_return_if_fail (buffer != NULL || count == 0);
812 task = g_task_new (stream, cancellable, callback, user_data);
813 data = g_slice_new0 (AsyncReadAll);
814 data->buffer = buffer;
815 data->to_read = count;
817 g_task_set_source_tag (task, g_input_stream_read_all_async);
818 g_task_set_task_data (task, data, free_async_read_all);
819 g_task_set_priority (task, io_priority);
821 /* If async reads are going to be handled via the threadpool anyway
822 * then we may as well do it with a single dispatch instead of
823 * bouncing in and out.
825 if (g_input_stream_async_read_is_via_threads (stream))
827 g_task_run_in_thread (task, read_all_async_thread);
828 g_object_unref (task);
831 read_all_callback (G_OBJECT (stream), NULL, task);
835 * g_input_stream_read_all_finish:
836 * @stream: a #GInputStream
837 * @result: a #GAsyncResult
838 * @bytes_read: (out): location to store the number of bytes that was read from the stream
839 * @error: a #GError location to store the error occurring, or %NULL to ignore
841 * Finishes an asynchronous stream read operation started with
842 * g_input_stream_read_all_async().
844 * As a special exception to the normal conventions for functions that
845 * use #GError, if this function returns %FALSE (and sets @error) then
846 * @bytes_read will be set to the number of bytes that were successfully
847 * read before the error was encountered. This functionality is only
848 * available from C. If you need it from another language then you must
849 * write your own loop around g_input_stream_read_async().
851 * Returns: %TRUE on success, %FALSE if there was an error
856 g_input_stream_read_all_finish (GInputStream *stream,
857 GAsyncResult *result,
863 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
864 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
866 task = G_TASK (result);
870 AsyncReadAll *data = g_task_get_task_data (task);
872 *bytes_read = data->bytes_read;
875 return g_task_propagate_boolean (task, error);
879 read_bytes_callback (GObject *stream,
880 GAsyncResult *result,
883 GTask *task = user_data;
884 guchar *buf = g_task_get_task_data (task);
885 GError *error = NULL;
887 GBytes *bytes = NULL;
889 nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
894 g_task_return_error (task, error);
899 bytes = g_bytes_new_static ("", 0);
902 bytes = g_bytes_new_take (buf, nread);
905 g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
907 g_object_unref (task);
911 * g_input_stream_read_bytes_async:
912 * @stream: A #GInputStream.
913 * @count: the number of bytes that will be read from the stream
914 * @io_priority: the [I/O priority][io-priority] of the request
915 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
916 * @callback: (scope async): callback to call when the request is satisfied
917 * @user_data: (closure): the data to pass to callback function
919 * Request an asynchronous read of @count bytes from the stream into a
920 * new #GBytes. When the operation is finished @callback will be
921 * called. You can then call g_input_stream_read_bytes_finish() to get the
922 * result of the operation.
924 * During an async request no other sync and async calls are allowed
925 * on @stream, and will result in %G_IO_ERROR_PENDING errors.
927 * A value of @count larger than %G_MAXSSIZE will cause a
928 * %G_IO_ERROR_INVALID_ARGUMENT error.
930 * On success, the new #GBytes will be passed to the callback. It is
931 * not an error if this is smaller than the requested size, as it can
932 * happen e.g. near the end of a file, but generally we try to read as
933 * many bytes as requested. Zero is returned on end of file (or if
934 * @count is zero), but never otherwise.
936 * Any outstanding I/O request with higher priority (lower numerical
937 * value) will be executed before an outstanding request with lower
938 * priority. Default priority is %G_PRIORITY_DEFAULT.
943 g_input_stream_read_bytes_async (GInputStream *stream,
946 GCancellable *cancellable,
947 GAsyncReadyCallback callback,
953 task = g_task_new (stream, cancellable, callback, user_data);
954 g_task_set_source_tag (task, g_input_stream_read_bytes_async);
956 buf = g_malloc (count);
957 g_task_set_task_data (task, buf, NULL);
959 g_input_stream_read_async (stream, buf, count,
960 io_priority, cancellable,
961 read_bytes_callback, task);
965 * g_input_stream_read_bytes_finish:
966 * @stream: a #GInputStream.
967 * @result: a #GAsyncResult.
968 * @error: a #GError location to store the error occurring, or %NULL to
971 * Finishes an asynchronous stream read-into-#GBytes operation.
973 * Returns: (transfer full): the newly-allocated #GBytes, or %NULL on error
978 g_input_stream_read_bytes_finish (GInputStream *stream,
979 GAsyncResult *result,
982 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
983 g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
985 return g_task_propagate_pointer (G_TASK (result), error);
989 * g_input_stream_skip_async:
990 * @stream: A #GInputStream.
991 * @count: the number of bytes that will be skipped from the stream
992 * @io_priority: the [I/O priority][io-priority] of the request
993 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
994 * @callback: (scope async): callback to call when the request is satisfied
995 * @user_data: (closure): the data to pass to callback function
997 * Request an asynchronous skip of @count bytes from the stream.
998 * When the operation is finished @callback will be called.
999 * You can then call g_input_stream_skip_finish() to get the result
1002 * During an async request no other sync and async calls are allowed,
1003 * and will result in %G_IO_ERROR_PENDING errors.
1005 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
1007 * On success, the number of bytes skipped will be passed to the callback.
1008 * It is not an error if this is not the same as the requested size, as it
1009 * can happen e.g. near the end of a file, but generally we try to skip
1010 * as many bytes as requested. Zero is returned on end of file
1011 * (or if @count is zero), but never otherwise.
1013 * Any outstanding i/o request with higher priority (lower numerical value)
1014 * will be executed before an outstanding request with lower priority.
1015 * Default priority is %G_PRIORITY_DEFAULT.
1017 * The asynchronous methods have a default fallback that uses threads to
1018 * implement asynchronicity, so they are optional for inheriting classes.
1019 * However, if you override one, you must override all.
1022 g_input_stream_skip_async (GInputStream *stream,
1025 GCancellable *cancellable,
1026 GAsyncReadyCallback callback,
1029 GInputStreamClass *class;
1030 GError *error = NULL;
1032 g_return_if_fail (G_IS_INPUT_STREAM (stream));
1038 task = g_task_new (stream, cancellable, callback, user_data);
1039 g_task_set_source_tag (task, g_input_stream_skip_async);
1040 g_task_return_int (task, 0);
1041 g_object_unref (task);
1045 if (((gssize) count) < 0)
1047 g_task_report_new_error (stream, callback, user_data,
1048 g_input_stream_skip_async,
1049 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1050 _("Too large count value passed to %s"),
1055 if (!g_input_stream_set_pending (stream, &error))
1057 g_task_report_error (stream, callback, user_data,
1058 g_input_stream_skip_async,
1063 class = G_INPUT_STREAM_GET_CLASS (stream);
1064 stream->priv->outstanding_callback = callback;
1065 g_object_ref (stream);
1066 class->skip_async (stream, count, io_priority, cancellable,
1067 async_ready_callback_wrapper, user_data);
1071 * g_input_stream_skip_finish:
1072 * @stream: a #GInputStream.
1073 * @result: a #GAsyncResult.
1074 * @error: a #GError location to store the error occurring, or %NULL to
1077 * Finishes a stream skip operation.
1079 * Returns: the size of the bytes skipped, or `-1` on error.
1082 g_input_stream_skip_finish (GInputStream *stream,
1083 GAsyncResult *result,
1086 GInputStreamClass *class;
1088 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
1089 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1091 if (g_async_result_legacy_propagate_error (result, error))
1093 else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
1094 return g_task_propagate_int (G_TASK (result), error);
1096 class = G_INPUT_STREAM_GET_CLASS (stream);
1097 return class->skip_finish (stream, result, error);
1101 * g_input_stream_close_async:
1102 * @stream: A #GInputStream.
1103 * @io_priority: the [I/O priority][io-priority] of the request
1104 * @cancellable: (nullable): optional cancellable object
1105 * @callback: (scope async): callback to call when the request is satisfied
1106 * @user_data: (closure): the data to pass to callback function
1108 * Requests an asynchronous closes of the stream, releasing resources related to it.
1109 * When the operation is finished @callback will be called.
1110 * You can then call g_input_stream_close_finish() to get the result of the
1113 * For behaviour details see g_input_stream_close().
1115 * The asynchronous methods have a default fallback that uses threads to implement
1116 * asynchronicity, so they are optional for inheriting classes. However, if you
1117 * override one you must override all.
1120 g_input_stream_close_async (GInputStream *stream,
1122 GCancellable *cancellable,
1123 GAsyncReadyCallback callback,
1126 GInputStreamClass *class;
1127 GError *error = NULL;
1129 g_return_if_fail (G_IS_INPUT_STREAM (stream));
1131 if (stream->priv->closed)
1135 task = g_task_new (stream, cancellable, callback, user_data);
1136 g_task_set_source_tag (task, g_input_stream_close_async);
1137 g_task_return_boolean (task, TRUE);
1138 g_object_unref (task);
1142 if (!g_input_stream_set_pending (stream, &error))
1144 g_task_report_error (stream, callback, user_data,
1145 g_input_stream_close_async,
1150 class = G_INPUT_STREAM_GET_CLASS (stream);
1151 stream->priv->outstanding_callback = callback;
1152 g_object_ref (stream);
1153 class->close_async (stream, io_priority, cancellable,
1154 async_ready_close_callback_wrapper, user_data);
1158 * g_input_stream_close_finish:
1159 * @stream: a #GInputStream.
1160 * @result: a #GAsyncResult.
1161 * @error: a #GError location to store the error occurring, or %NULL to
1164 * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
1166 * Returns: %TRUE if the stream was closed successfully.
1169 g_input_stream_close_finish (GInputStream *stream,
1170 GAsyncResult *result,
1173 GInputStreamClass *class;
1175 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1176 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1178 if (g_async_result_legacy_propagate_error (result, error))
1180 else if (g_async_result_is_tagged (result, g_input_stream_close_async))
1181 return g_task_propagate_boolean (G_TASK (result), error);
1183 class = G_INPUT_STREAM_GET_CLASS (stream);
1184 return class->close_finish (stream, result, error);
1188 * g_input_stream_is_closed:
1189 * @stream: input stream.
1191 * Checks if an input stream is closed.
1193 * Returns: %TRUE if the stream is closed.
1196 g_input_stream_is_closed (GInputStream *stream)
1198 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1200 return stream->priv->closed;
1204 * g_input_stream_has_pending:
1205 * @stream: input stream.
1207 * Checks if an input stream has pending actions.
1209 * Returns: %TRUE if @stream has pending actions.
1212 g_input_stream_has_pending (GInputStream *stream)
1214 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1216 return stream->priv->pending;
1220 * g_input_stream_set_pending:
1221 * @stream: input stream
1222 * @error: a #GError location to store the error occurring, or %NULL to
1225 * Sets @stream to have actions pending. If the pending flag is
1226 * already set or @stream is closed, it will return %FALSE and set
1229 * Returns: %TRUE if pending was previously unset and is now set.
1232 g_input_stream_set_pending (GInputStream *stream, GError **error)
1234 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1236 if (stream->priv->closed)
1238 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1239 _("Stream is already closed"));
1243 if (stream->priv->pending)
1245 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1246 /* Translators: This is an error you get if there is already an
1247 * operation running against this stream when you try to start
1249 _("Stream has outstanding operation"));
1253 stream->priv->pending = TRUE;
1258 * g_input_stream_clear_pending:
1259 * @stream: input stream
1261 * Clears the pending flag on @stream.
1264 g_input_stream_clear_pending (GInputStream *stream)
1266 g_return_if_fail (G_IS_INPUT_STREAM (stream));
1268 stream->priv->pending = FALSE;
1272 * g_input_stream_async_read_is_via_threads:
1273 * @stream: input stream
1275 * Checks if an input stream's read_async function uses threads.
1277 * Returns: %TRUE if @stream's read_async function uses threads.
1280 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1282 GInputStreamClass *class;
1284 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1286 class = G_INPUT_STREAM_GET_CLASS (stream);
1288 return (class->read_async == g_input_stream_real_read_async &&
1289 !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1290 g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1294 * g_input_stream_async_close_is_via_threads:
1295 * @stream: input stream
1297 * Checks if an input stream's close_async function uses threads.
1299 * Returns: %TRUE if @stream's close_async function uses threads.
1302 g_input_stream_async_close_is_via_threads (GInputStream *stream)
1304 GInputStreamClass *class;
1306 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1308 class = G_INPUT_STREAM_GET_CLASS (stream);
1310 return class->close_async == g_input_stream_real_close_async;
1313 /********************************************
1314 * Default implementation of async ops *
1315 ********************************************/
1323 free_read_data (ReadData *op)
1325 g_slice_free (ReadData, op);
1329 read_async_thread (GTask *task,
1330 gpointer source_object,
1332 GCancellable *cancellable)
1334 GInputStream *stream = source_object;
1335 ReadData *op = task_data;
1336 GInputStreamClass *class;
1337 GError *error = NULL;
1340 class = G_INPUT_STREAM_GET_CLASS (stream);
1342 nread = class->read_fn (stream,
1343 op->buffer, op->count,
1344 g_task_get_cancellable (task),
1347 g_task_return_error (task, error);
1349 g_task_return_int (task, nread);
1352 static void read_async_pollable (GPollableInputStream *stream,
1356 read_async_pollable_ready (GPollableInputStream *stream,
1359 GTask *task = user_data;
1361 read_async_pollable (stream, task);
1366 read_async_pollable (GPollableInputStream *stream,
1369 ReadData *op = g_task_get_task_data (task);
1370 GError *error = NULL;
1373 if (g_task_return_error_if_cancelled (task))
1376 nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1377 read_nonblocking (stream, op->buffer, op->count, &error);
1379 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1383 g_error_free (error);
1385 source = g_pollable_input_stream_create_source (stream,
1386 g_task_get_cancellable (task));
1387 g_task_attach_source (task, source,
1388 (GSourceFunc) read_async_pollable_ready);
1389 g_source_unref (source);
1394 g_task_return_error (task, error);
1396 g_task_return_int (task, nread);
1397 /* g_input_stream_real_read_async() unrefs task */
1402 g_input_stream_real_read_async (GInputStream *stream,
1406 GCancellable *cancellable,
1407 GAsyncReadyCallback callback,
1413 op = g_slice_new0 (ReadData);
1414 task = g_task_new (stream, cancellable, callback, user_data);
1415 g_task_set_source_tag (task, g_input_stream_real_read_async);
1416 g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1417 g_task_set_priority (task, io_priority);
1418 op->buffer = buffer;
1421 if (!g_input_stream_async_read_is_via_threads (stream))
1422 read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1424 g_task_run_in_thread (task, read_async_thread);
1425 g_object_unref (task);
1429 g_input_stream_real_read_finish (GInputStream *stream,
1430 GAsyncResult *result,
1433 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1435 return g_task_propagate_int (G_TASK (result), error);
1440 skip_async_thread (GTask *task,
1441 gpointer source_object,
1443 GCancellable *cancellable)
1445 GInputStream *stream = source_object;
1446 gsize count = GPOINTER_TO_SIZE (task_data);
1447 GInputStreamClass *class;
1448 GError *error = NULL;
1451 class = G_INPUT_STREAM_GET_CLASS (stream);
1452 ret = class->skip (stream, count,
1453 g_task_get_cancellable (task),
1456 g_task_return_error (task, error);
1458 g_task_return_int (task, ret);
1464 gsize count_skipped;
1465 } SkipFallbackAsyncData;
1468 skip_callback_wrapper (GObject *source_object,
1472 GInputStreamClass *class;
1473 GTask *task = user_data;
1474 SkipFallbackAsyncData *data = g_task_get_task_data (task);
1475 GError *error = NULL;
1478 ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1483 data->count_skipped += ret;
1485 if (data->count > 0)
1487 class = G_INPUT_STREAM_GET_CLASS (source_object);
1488 class->read_async (G_INPUT_STREAM (source_object),
1489 data->buffer, MIN (8192, data->count),
1490 g_task_get_priority (task),
1491 g_task_get_cancellable (task),
1492 skip_callback_wrapper, task);
1498 g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1499 data->count_skipped)
1501 /* No error, return partial read */
1502 g_clear_error (&error);
1506 g_task_return_error (task, error);
1508 g_task_return_int (task, data->count_skipped);
1509 g_object_unref (task);
1513 g_input_stream_real_skip_async (GInputStream *stream,
1516 GCancellable *cancellable,
1517 GAsyncReadyCallback callback,
1520 GInputStreamClass *class;
1521 SkipFallbackAsyncData *data;
1524 class = G_INPUT_STREAM_GET_CLASS (stream);
1526 task = g_task_new (stream, cancellable, callback, user_data);
1527 g_task_set_source_tag (task, g_input_stream_real_skip_async);
1528 g_task_set_priority (task, io_priority);
1530 if (g_input_stream_async_read_is_via_threads (stream))
1532 /* Read is thread-using async fallback.
1533 * Make skip use threads too, so that we can use a possible sync skip
1534 * implementation. */
1535 g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1537 g_task_run_in_thread (task, skip_async_thread);
1538 g_object_unref (task);
1542 /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1544 /* There is a custom async read function, lets use that. */
1545 data = g_new (SkipFallbackAsyncData, 1);
1546 data->count = count;
1547 data->count_skipped = 0;
1548 g_task_set_task_data (task, data, g_free);
1549 g_task_set_check_cancellable (task, FALSE);
1550 class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1551 skip_callback_wrapper, task);
1557 g_input_stream_real_skip_finish (GInputStream *stream,
1558 GAsyncResult *result,
1561 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1563 return g_task_propagate_int (G_TASK (result), error);
1567 close_async_thread (GTask *task,
1568 gpointer source_object,
1570 GCancellable *cancellable)
1572 GInputStream *stream = source_object;
1573 GInputStreamClass *class;
1574 GError *error = NULL;
1577 class = G_INPUT_STREAM_GET_CLASS (stream);
1578 if (class->close_fn)
1580 result = class->close_fn (stream,
1581 g_task_get_cancellable (task),
1585 g_task_return_error (task, error);
1590 g_task_return_boolean (task, TRUE);
1594 g_input_stream_real_close_async (GInputStream *stream,
1596 GCancellable *cancellable,
1597 GAsyncReadyCallback callback,
1602 task = g_task_new (stream, cancellable, callback, user_data);
1603 g_task_set_source_tag (task, g_input_stream_real_close_async);
1604 g_task_set_check_cancellable (task, FALSE);
1605 g_task_set_priority (task, io_priority);
1607 g_task_run_in_thread (task, close_async_thread);
1608 g_object_unref (task);
1612 g_input_stream_real_close_finish (GInputStream *stream,
1613 GAsyncResult *result,
1616 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1618 return g_task_propagate_boolean (G_TASK (result), error);