1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18 * Boston, MA 02111-1307, USA.
20 * Author: Alexander Larsson <alexl@redhat.com>
27 #include "ginputstream.h"
28 #include "gseekable.h"
29 #include "gsimpleasyncresult.h"
31 G_DEFINE_TYPE (GInputStream, g_input_stream, G_TYPE_OBJECT);
33 struct _GInputStreamPrivate {
36 GAsyncReadyCallback outstanding_callback;
39 static gssize g_input_stream_real_skip (GInputStream *stream,
41 GCancellable *cancellable,
43 static void g_input_stream_real_read_async (GInputStream *stream,
47 GCancellable *cancellable,
48 GAsyncReadyCallback callback,
50 static gssize g_input_stream_real_read_finish (GInputStream *stream,
53 static void g_input_stream_real_skip_async (GInputStream *stream,
56 GCancellable *cancellable,
57 GAsyncReadyCallback callback,
59 static gssize g_input_stream_real_skip_finish (GInputStream *stream,
62 static void g_input_stream_real_close_async (GInputStream *stream,
64 GCancellable *cancellable,
65 GAsyncReadyCallback callback,
67 static gboolean g_input_stream_real_close_finish (GInputStream *stream,
72 g_input_stream_finalize (GObject *object)
76 stream = G_INPUT_STREAM (object);
78 if (!stream->priv->closed)
79 g_input_stream_close (stream, NULL, NULL);
81 if (G_OBJECT_CLASS (g_input_stream_parent_class)->finalize)
82 (*G_OBJECT_CLASS (g_input_stream_parent_class)->finalize) (object);
86 g_input_stream_dispose (GObject *object)
90 stream = G_INPUT_STREAM (object);
92 if (!stream->priv->closed)
93 g_input_stream_close (stream, NULL, NULL);
95 if (G_OBJECT_CLASS (g_input_stream_parent_class)->dispose)
96 (*G_OBJECT_CLASS (g_input_stream_parent_class)->dispose) (object);
101 g_input_stream_class_init (GInputStreamClass *klass)
103 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
105 g_type_class_add_private (klass, sizeof (GInputStreamPrivate));
107 gobject_class->finalize = g_input_stream_finalize;
108 gobject_class->dispose = g_input_stream_dispose;
110 klass->skip = g_input_stream_real_skip;
111 klass->read_async = g_input_stream_real_read_async;
112 klass->read_finish = g_input_stream_real_read_finish;
113 klass->skip_async = g_input_stream_real_skip_async;
114 klass->skip_finish = g_input_stream_real_skip_finish;
115 klass->close_async = g_input_stream_real_close_async;
116 klass->close_finish = g_input_stream_real_close_finish;
120 g_input_stream_init (GInputStream *stream)
122 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
124 GInputStreamPrivate);
128 * g_input_stream_read:
129 * @stream: a #GInputStream.
130 * @buffer: a buffer to read data into (which should be at least count bytes long).
131 * @count: the number of bytes that will be read from the stream
132 * @cancellable: optional #GCancellable object, %NULL to ignore.
133 * @error: location to store the error occuring, or %NULL to ignore
135 * Tries to read @count bytes from the stream into the buffer starting at
136 * @buffer. Will block during this read.
138 * If count is zero returns zero and does nothing. A value of @count
139 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
141 * On success, the number of bytes read into the buffer is returned.
142 * It is not an error if this is not the same as the requested size, as it
143 * can happen e.g. near the end of a file. Zero is returned on end of file
144 * (or if @count is zero), but never otherwise.
146 * If @cancellable is not NULL, then the operation can be cancelled by
147 * triggering the cancellable object from another thread. If the operation
148 * was cancelled, the error G_IO_ERROR_CANCELLED will be returned. If an
149 * operation was partially finished when the operation was cancelled the
150 * partial result will be returned, without an error.
152 * On error -1 is returned and @error is set accordingly.
154 * Return value: Number of bytes read, or -1 on error
157 g_input_stream_read (GInputStream *stream,
160 GCancellable *cancellable,
163 GInputStreamClass *class;
166 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
167 g_return_val_if_fail (buffer != NULL, 0);
172 if (((gssize) count) < 0)
174 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
175 _("Too large count value passed to g_input_stream_read"));
179 if (stream->priv->closed)
181 g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
182 _("Stream is already closed"));
186 if (stream->priv->pending)
188 g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
189 _("Stream has outstanding operation"));
193 class = G_INPUT_STREAM_GET_CLASS (stream);
195 if (class->read == NULL)
197 g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
198 _("Input stream doesn't implement read"));
203 g_push_current_cancellable (cancellable);
205 stream->priv->pending = TRUE;
206 res = class->read (stream, buffer, count, cancellable, error);
207 stream->priv->pending = FALSE;
210 g_pop_current_cancellable (cancellable);
216 * g_input_stream_read_all:
217 * @stream: a #GInputStream.
218 * @buffer: a buffer to read data into (which should be at least count bytes long).
219 * @count: the number of bytes that will be read from the stream
220 * @bytes_read: location to store the number of bytes that was read from the stream
221 * @cancellable: optional #GCancellable object, %NULL to ignore.
222 * @error: location to store the error occuring, or %NULL to ignore
224 * Tries to read @count bytes from the stream into the buffer starting at
225 * @buffer. Will block during this read.
227 * This function is similar to g_input_stream_read(), except it tries to
228 * read as many bytes as requested, only stopping on an error or end of stream.
230 * On a successful read of @count bytes, or if we reached the end of the
231 * stream, TRUE is returned, and @bytes_read is set to the number of bytes
234 * If there is an error during the operation FALSE is returned and @error
235 * is set to indicate the error status, @bytes_read is updated to contain
236 * the number of bytes read into @buffer before the error occured.
238 * Return value: TRUE on success, FALSE if there was an error
241 g_input_stream_read_all (GInputStream *stream,
245 GCancellable *cancellable,
251 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
252 g_return_val_if_fail (buffer != NULL, FALSE);
255 while (_bytes_read < count)
257 res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
262 *bytes_read = _bytes_read;
273 *bytes_read = _bytes_read;
278 * g_input_stream_skip:
279 * @stream: a #GInputStream.
280 * @count: the number of bytes that will be skipped from the stream
281 * @cancellable: optional #GCancellable object, %NULL to ignore.
282 * @error: location to store the error occuring, or %NULL to ignore
284 * Tries to skip @count bytes from the stream. Will block during the operation.
286 * This is identical to g_input_stream_read(), from a behaviour standpoint,
287 * but the bytes that are skipped are not returned to the user. Some
288 * streams have an implementation that is more efficient than reading the data.
290 * This function is optional for inherited classes, as the default implementation
291 * emulates it using read.
293 * If @cancellable is not %NULL, then the operation can be cancelled by
294 * triggering the cancellable object from another thread. If the operation
295 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
296 * operation was partially finished when the operation was cancelled the
297 * partial result will be returned, without an error.
299 * Return value: Number of bytes skipped, or -1 on error
302 g_input_stream_skip (GInputStream *stream,
304 GCancellable *cancellable,
307 GInputStreamClass *class;
310 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
315 if (((gssize) count) < 0)
317 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
318 _("Too large count value passed to g_input_stream_skip"));
322 if (stream->priv->closed)
324 g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
325 _("Stream is already closed"));
329 if (stream->priv->pending)
331 g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
332 _("Stream has outstanding operation"));
336 class = G_INPUT_STREAM_GET_CLASS (stream);
339 g_push_current_cancellable (cancellable);
341 stream->priv->pending = TRUE;
342 res = class->skip (stream, count, cancellable, error);
343 stream->priv->pending = FALSE;
346 g_pop_current_cancellable (cancellable);
352 g_input_stream_real_skip (GInputStream *stream,
354 GCancellable *cancellable,
357 GInputStreamClass *class;
358 gssize ret, read_bytes;
362 class = G_INPUT_STREAM_GET_CLASS (stream);
364 if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
366 if (g_seekable_seek (G_SEEKABLE (stream),
374 /* If not seekable, or seek failed, fall back to reading data: */
376 class = G_INPUT_STREAM_GET_CLASS (stream);
383 ret = class->read (stream, buffer, MIN (sizeof (buffer), count),
384 cancellable, &my_error);
387 if (read_bytes > 0 &&
388 my_error->domain == G_IO_ERROR &&
389 my_error->code == G_IO_ERROR_CANCELLED)
391 g_error_free (my_error);
395 g_propagate_error (error, my_error);
402 if (ret == 0 || count == 0)
408 * g_input_stream_close:
409 * @stream: A #GInputStream.
410 * @error: location to store the error occuring, or %NULL to ignore
412 * Closes the stream, releasing resources related to it.
414 * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
415 * Closing a stream multiple times will not return an error.
417 * Streams will be automatically closed when the last reference
418 * is dropped, but you might want to call make sure resources
419 * are released as early as possible.
421 * Some streams might keep the backing store of the stream (e.g. a file descriptor)
422 * open after the stream is closed. See the documentation for the individual
423 * stream for details.
425 * On failure the first error that happened will be reported, but the close
426 * operation will finish as much as possible. A stream that failed to
427 * close will still return %G_IO_ERROR_CLOSED all operations. Still, it
428 * is important to check and report the error to the user.
430 * If @cancellable is not NULL, then the operation can be cancelled by
431 * triggering the cancellable object from another thread. If the operation
432 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
433 * Cancelling a close will still leave the stream closed, but some streams
434 * can use a faster close that doesn't block to e.g. check errors.
436 * Return value: %TRUE on success, %FALSE on failure
439 g_input_stream_close (GInputStream *stream,
440 GCancellable *cancellable,
443 GInputStreamClass *class;
446 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
448 class = G_INPUT_STREAM_GET_CLASS (stream);
450 if (stream->priv->closed)
453 if (stream->priv->pending)
455 g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
456 _("Stream has outstanding operation"));
462 stream->priv->pending = TRUE;
465 g_push_current_cancellable (cancellable);
468 res = class->close (stream, cancellable, error);
471 g_pop_current_cancellable (cancellable);
473 stream->priv->closed = TRUE;
475 stream->priv->pending = FALSE;
481 async_ready_callback_wrapper (GObject *source_object,
485 GInputStream *stream = G_INPUT_STREAM (source_object);
487 stream->priv->pending = FALSE;
488 if (stream->priv->outstanding_callback)
489 (*stream->priv->outstanding_callback) (source_object, res, user_data);
490 g_object_unref (stream);
494 async_ready_close_callback_wrapper (GObject *source_object,
498 GInputStream *stream = G_INPUT_STREAM (source_object);
500 stream->priv->pending = FALSE;
501 stream->priv->closed = TRUE;
502 if (stream->priv->outstanding_callback)
503 (*stream->priv->outstanding_callback) (source_object, res, user_data);
504 g_object_unref (stream);
508 * g_input_stream_read_async:
509 * @stream: A #GInputStream.
510 * @buffer: a buffer to read data into (which should be at least count bytes long).
511 * @count: the number of bytes that will be read from the stream
512 * @io_priority: the io priority of the request. the io priority of the request
513 * @cancellable: optional #GCancellable object, %NULL to ignore.
514 * @callback: callback to call when the request is satisfied
515 * @user_data: the data to pass to callback function
517 * Request an asynchronous read of @count bytes from the stream into the buffer
518 * starting at @buffer. When the operation is finished @callback will be called,
519 * giving the results.
521 * During an async request no other sync and async calls are allowed, and will
522 * result in %G_IO_ERROR_PENDING errors.
524 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
526 * On success, the number of bytes read into the buffer will be passed to the
527 * callback. It is not an error if this is not the same as the requested size, as it
528 * can happen e.g. near the end of a file, but generally we try to read
529 * as many bytes as requested. Zero is returned on end of file
530 * (or if @count is zero), but never otherwise.
532 * Any outstanding i/o request with higher priority (lower numerical value) will
533 * be executed before an outstanding request with lower priority. Default
534 * priority is %G_PRIORITY_DEFAULT.
536 * The asyncronous methods have a default fallback that uses threads to implement
537 * asynchronicity, so they are optional for inheriting classes. However, if you
538 * override one you must override all.
541 g_input_stream_read_async (GInputStream *stream,
545 GCancellable *cancellable,
546 GAsyncReadyCallback callback,
549 GInputStreamClass *class;
550 GSimpleAsyncResult *simple;
552 g_return_if_fail (G_IS_INPUT_STREAM (stream));
553 g_return_if_fail (buffer != NULL);
557 simple = g_simple_async_result_new (G_OBJECT (stream),
560 g_input_stream_read_async);
561 g_simple_async_result_complete_in_idle (simple);
562 g_object_unref (simple);
566 if (((gssize) count) < 0)
568 g_simple_async_report_error_in_idle (G_OBJECT (stream),
571 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
572 _("Too large count value passed to g_input_stream_read_async"));
576 if (stream->priv->closed)
578 g_simple_async_report_error_in_idle (G_OBJECT (stream),
581 G_IO_ERROR, G_IO_ERROR_CLOSED,
582 _("Stream is already closed"));
586 if (stream->priv->pending)
588 g_simple_async_report_error_in_idle (G_OBJECT (stream),
591 G_IO_ERROR, G_IO_ERROR_PENDING,
592 _("Stream has outstanding operation"));
596 class = G_INPUT_STREAM_GET_CLASS (stream);
598 stream->priv->pending = TRUE;
599 stream->priv->outstanding_callback = callback;
600 g_object_ref (stream);
601 class->read_async (stream, buffer, count, io_priority, cancellable,
602 async_ready_callback_wrapper, user_data);
606 * g_input_stream_read_finish:
609 * @error: a #GError location to store the error occuring, or %NULL to
614 g_input_stream_read_finish (GInputStream *stream,
615 GAsyncResult *result,
618 GSimpleAsyncResult *simple;
619 GInputStreamClass *class;
621 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
622 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
624 if (G_IS_SIMPLE_ASYNC_RESULT (result))
626 simple = G_SIMPLE_ASYNC_RESULT (result);
627 if (g_simple_async_result_propagate_error (simple, error))
630 /* Special case read of 0 bytes */
631 if (g_simple_async_result_get_source_tag (simple) == g_input_stream_read_async)
635 class = G_INPUT_STREAM_GET_CLASS (stream);
636 return class->read_finish (stream, result, error);
640 * g_input_stream_skip_async:
641 * @stream: A #GInputStream.
642 * @count: the number of bytes that will be skipped from the stream
643 * @io_priority: the io priority of the request. the io priority of the request
644 * @cancellable: optional #GCancellable object, %NULL to ignore.
645 * @callback: callback to call when the request is satisfied
646 * @user_data: the data to pass to callback function
648 * Request an asynchronous skip of @count bytes from the stream into the buffer
649 * starting at @buffer. When the operation is finished @callback will be called,
650 * giving the results.
652 * During an async request no other sync and async calls are allowed, and will
653 * result in %G_IO_ERROR_PENDING errors.
655 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
657 * On success, the number of bytes skipped will be passed to the
658 * callback. It is not an error if this is not the same as the requested size, as it
659 * can happen e.g. near the end of a file, but generally we try to skip
660 * as many bytes as requested. Zero is returned on end of file
661 * (or if @count is zero), but never otherwise.
663 * Any outstanding i/o request with higher priority (lower numerical value) will
664 * be executed before an outstanding request with lower priority. Default
665 * priority is %G_PRIORITY_DEFAULT.
667 * The asyncronous methods have a default fallback that uses threads to implement
668 * asynchronicity, so they are optional for inheriting classes. However, if you
669 * override one you must override all.
672 g_input_stream_skip_async (GInputStream *stream,
675 GCancellable *cancellable,
676 GAsyncReadyCallback callback,
679 GInputStreamClass *class;
680 GSimpleAsyncResult *simple;
682 g_return_if_fail (G_IS_INPUT_STREAM (stream));
686 simple = g_simple_async_result_new (G_OBJECT (stream),
689 g_input_stream_skip_async);
691 g_simple_async_result_complete_in_idle (simple);
692 g_object_unref (simple);
696 if (((gssize) count) < 0)
698 g_simple_async_report_error_in_idle (G_OBJECT (stream),
701 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
702 _("Too large count value passed to g_input_stream_skip_async"));
706 if (stream->priv->closed)
708 g_simple_async_report_error_in_idle (G_OBJECT (stream),
711 G_IO_ERROR, G_IO_ERROR_CLOSED,
712 _("Stream is already closed"));
716 if (stream->priv->pending)
718 g_simple_async_report_error_in_idle (G_OBJECT (stream),
721 G_IO_ERROR, G_IO_ERROR_PENDING,
722 _("Stream has outstanding operation"));
726 class = G_INPUT_STREAM_GET_CLASS (stream);
727 stream->priv->pending = TRUE;
728 stream->priv->outstanding_callback = callback;
729 g_object_ref (stream);
730 class->skip_async (stream, count, io_priority, cancellable,
731 async_ready_callback_wrapper, user_data);
735 * g_input_stream_skip_finish:
738 * @error: a #GError location to store the error occuring, or %NULL to
743 g_input_stream_skip_finish (GInputStream *stream,
744 GAsyncResult *result,
747 GSimpleAsyncResult *simple;
748 GInputStreamClass *class;
750 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
751 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
753 if (G_IS_SIMPLE_ASYNC_RESULT (result))
755 simple = G_SIMPLE_ASYNC_RESULT (result);
756 if (g_simple_async_result_propagate_error (simple, error))
759 /* Special case skip of 0 bytes */
760 if (g_simple_async_result_get_source_tag (simple) == g_input_stream_skip_async)
764 class = G_INPUT_STREAM_GET_CLASS (stream);
765 return class->skip_finish (stream, result, error);
769 * g_input_stream_close_async:
770 * @stream: A #GInputStream.
771 * @io_priority: the io priority of the request. the io priority of the request
772 * @cancellable: optional cancellable object
773 * @callback: callback to call when the request is satisfied
774 * @user_data: the data to pass to callback function
776 * Requests an asynchronous closes of the stream, releasing resources related to it.
777 * When the operation is finished @callback will be called, giving the results.
779 * For behaviour details see g_input_stream_close().
781 * The asyncronous methods have a default fallback that uses threads to implement
782 * asynchronicity, so they are optional for inheriting classes. However, if you
783 * override one you must override all.
786 g_input_stream_close_async (GInputStream *stream,
788 GCancellable *cancellable,
789 GAsyncReadyCallback callback,
792 GInputStreamClass *class;
793 GSimpleAsyncResult *simple;
795 g_return_if_fail (G_IS_INPUT_STREAM (stream));
797 if (stream->priv->closed)
799 simple = g_simple_async_result_new (G_OBJECT (stream),
802 g_input_stream_close_async);
804 g_simple_async_result_complete_in_idle (simple);
805 g_object_unref (simple);
809 if (stream->priv->pending)
811 g_simple_async_report_error_in_idle (G_OBJECT (stream),
814 G_IO_ERROR, G_IO_ERROR_PENDING,
815 _("Stream has outstanding operation"));
819 class = G_INPUT_STREAM_GET_CLASS (stream);
820 stream->priv->pending = TRUE;
821 stream->priv->outstanding_callback = callback;
822 g_object_ref (stream);
823 class->close_async (stream, io_priority, cancellable,
824 async_ready_close_callback_wrapper, user_data);
828 * g_input_stream_close_finish:
831 * @error: a #GError location to store the error occuring, or %NULL to
836 g_input_stream_close_finish (GInputStream *stream,
837 GAsyncResult *result,
840 GSimpleAsyncResult *simple;
841 GInputStreamClass *class;
843 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
844 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
846 if (G_IS_SIMPLE_ASYNC_RESULT (result))
848 simple = G_SIMPLE_ASYNC_RESULT (result);
849 if (g_simple_async_result_propagate_error (simple, error))
852 /* Special case already closed */
853 if (g_simple_async_result_get_source_tag (simple) == g_input_stream_close_async)
857 class = G_INPUT_STREAM_GET_CLASS (stream);
858 return class->close_finish (stream, result, error);
862 * g_input_stream_is_closed:
863 * @stream: input stream.
865 * Returns: %TRUE if the stream is closed.
868 g_input_stream_is_closed (GInputStream *stream)
870 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
872 return stream->priv->closed;
876 * g_input_stream_has_pending:
877 * @stream: input stream.
879 * Returns: %TRUE if @stream has pending actions.
882 g_input_stream_has_pending (GInputStream *stream)
884 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
886 return stream->priv->pending;
890 * g_input_stream_set_pending:
891 * @stream: input stream
894 * Sets @stream has actions pending.
897 g_input_stream_set_pending (GInputStream *stream,
900 g_return_if_fail (G_IS_INPUT_STREAM (stream));
902 stream->priv->pending = pending;
905 /********************************************
906 * Default implementation of async ops *
907 ********************************************/
911 gsize count_requested;
916 read_async_thread (GSimpleAsyncResult *res,
918 GCancellable *cancellable)
921 GInputStreamClass *class;
922 GError *error = NULL;
924 op = g_simple_async_result_get_op_res_gpointer (res);
926 class = G_INPUT_STREAM_GET_CLASS (object);
928 op->count_read = class->read (G_INPUT_STREAM (object),
929 op->buffer, op->count_requested,
930 cancellable, &error);
931 if (op->count_read == -1)
933 g_simple_async_result_set_from_error (res, error);
934 g_error_free (error);
939 g_input_stream_real_read_async (GInputStream *stream,
943 GCancellable *cancellable,
944 GAsyncReadyCallback callback,
947 GSimpleAsyncResult *res;
950 op = g_new (ReadData, 1);
951 res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_input_stream_real_read_async);
952 g_simple_async_result_set_op_res_gpointer (res, op, g_free);
954 op->count_requested = count;
956 g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
957 g_object_unref (res);
961 g_input_stream_real_read_finish (GInputStream *stream,
962 GAsyncResult *result,
965 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
968 g_assert (g_simple_async_result_get_source_tag (simple) ==
969 g_input_stream_real_read_async);
971 op = g_simple_async_result_get_op_res_gpointer (simple);
973 return op->count_read;
977 gsize count_requested;
978 gssize count_skipped;
983 skip_async_thread (GSimpleAsyncResult *res,
985 GCancellable *cancellable)
988 GInputStreamClass *class;
989 GError *error = NULL;
991 class = G_INPUT_STREAM_GET_CLASS (object);
992 op = g_simple_async_result_get_op_res_gpointer (res);
993 op->count_skipped = class->skip (G_INPUT_STREAM (object),
995 cancellable, &error);
996 if (op->count_skipped == -1)
998 g_simple_async_result_set_from_error (res, error);
999 g_error_free (error);
1006 gsize count_skipped;
1008 GCancellable *cancellable;
1010 GAsyncReadyCallback callback;
1011 } SkipFallbackAsyncData;
1014 skip_callback_wrapper (GObject *source_object,
1018 GInputStreamClass *class;
1019 SkipFallbackAsyncData *data = user_data;
1021 GSimpleAsyncResult *simple;
1022 GError *error = NULL;
1025 ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1030 data->count_skipped += ret;
1032 if (data->count > 0)
1034 class = G_INPUT_STREAM_GET_CLASS (source_object);
1035 class->read_async (G_INPUT_STREAM (source_object), data->buffer, MIN (8192, data->count), data->io_prio, data->cancellable,
1036 skip_callback_wrapper, data);
1041 op = g_new0 (SkipData, 1);
1042 op->count_skipped = data->count_skipped;
1043 simple = g_simple_async_result_new (source_object,
1044 data->callback, data->user_data,
1045 g_input_stream_real_skip_async);
1047 g_simple_async_result_set_op_res_gpointer (simple, op, g_free);
1051 if (data->count_skipped &&
1052 error->domain == G_IO_ERROR &&
1053 error->code == G_IO_ERROR_CANCELLED)
1054 { /* No error, return partial read */ }
1056 g_simple_async_result_set_from_error (simple, error);
1057 g_error_free (error);
1060 /* Complete immediately, not in idle, since we're already in a mainloop callout */
1061 g_simple_async_result_complete (simple);
1062 g_object_unref (simple);
1068 g_input_stream_real_skip_async (GInputStream *stream,
1071 GCancellable *cancellable,
1072 GAsyncReadyCallback callback,
1075 GInputStreamClass *class;
1077 SkipFallbackAsyncData *data;
1078 GSimpleAsyncResult *res;
1080 class = G_INPUT_STREAM_GET_CLASS (stream);
1082 if (class->read_async == g_input_stream_real_read_async)
1084 /* Read is thread-using async fallback.
1085 * Make skip use threads too, so that we can use a possible sync skip
1086 * implementation. */
1087 op = g_new0 (SkipData, 1);
1089 res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
1090 g_input_stream_real_skip_async);
1092 g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1094 op->count_requested = count;
1096 g_simple_async_result_run_in_thread (res, skip_async_thread, io_priority, cancellable);
1097 g_object_unref (res);
1101 /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1103 /* There is a custom async read function, lets use that. */
1104 data = g_new (SkipFallbackAsyncData, 1);
1105 data->count = count;
1106 data->count_skipped = 0;
1107 data->io_prio = io_priority;
1108 data->cancellable = cancellable;
1109 data->callback = callback;
1110 data->user_data = user_data;
1111 class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1112 skip_callback_wrapper, data);
1118 g_input_stream_real_skip_finish (GInputStream *stream,
1119 GAsyncResult *result,
1122 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1125 g_assert (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_skip_async);
1126 op = g_simple_async_result_get_op_res_gpointer (simple);
1127 return op->count_skipped;
1131 close_async_thread (GSimpleAsyncResult *res,
1133 GCancellable *cancellable)
1135 GInputStreamClass *class;
1136 GError *error = NULL;
1139 /* Auto handling of cancelation disabled, and ignore
1140 cancellation, since we want to close things anyway, although
1141 possibly in a quick-n-dirty way. At least we never want to leak
1144 class = G_INPUT_STREAM_GET_CLASS (object);
1145 result = class->close (G_INPUT_STREAM (object), cancellable, &error);
1148 g_simple_async_result_set_from_error (res, error);
1149 g_error_free (error);
1154 g_input_stream_real_close_async (GInputStream *stream,
1156 GCancellable *cancellable,
1157 GAsyncReadyCallback callback,
1160 GSimpleAsyncResult *res;
1162 res = g_simple_async_result_new (G_OBJECT (stream),
1165 g_input_stream_real_close_async);
1167 g_simple_async_result_set_handle_cancellation (res, FALSE);
1169 g_simple_async_result_run_in_thread (res,
1173 g_object_unref (res);
1177 g_input_stream_real_close_finish (GInputStream *stream,
1178 GAsyncResult *result,
1181 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1182 g_assert (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_close_async);