1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 * Copyright (C) 2007 Jürg Billeter
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General
17 * Public License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19 * Boston, MA 02111-1307, USA.
21 * Author: Christian Kellner <gicmo@gnome.org>
26 #include "gbufferedinputstream.h"
27 #include "ginputstream.h"
28 #include "gsimpleasyncresult.h"
33 #define DEFAULT_BUFFER_SIZE 4096
35 struct _GBufferedInputStreamPrivate {
40 GAsyncReadyCallback outstanding_callback;
48 static void g_buffered_input_stream_set_property (GObject *object,
53 static void g_buffered_input_stream_get_property (GObject *object,
57 static void g_buffered_input_stream_finalize (GObject *object);
60 static gssize g_buffered_input_stream_skip (GInputStream *stream,
62 GCancellable *cancellable,
64 static void g_buffered_input_stream_skip_async (GInputStream *stream,
67 GCancellable *cancellable,
68 GAsyncReadyCallback callback,
70 static gssize g_buffered_input_stream_skip_finish (GInputStream *stream,
73 static gssize g_buffered_input_stream_read (GInputStream *stream,
76 GCancellable *cancellable,
78 static void g_buffered_input_stream_read_async (GInputStream *stream,
82 GCancellable *cancellable,
83 GAsyncReadyCallback callback,
85 static gssize g_buffered_input_stream_read_finish (GInputStream *stream,
88 static gssize g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
90 GCancellable *cancellable,
92 static void g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
95 GCancellable *cancellable,
96 GAsyncReadyCallback callback,
98 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
102 static void compact_buffer (GBufferedInputStream *stream);
104 G_DEFINE_TYPE (GBufferedInputStream,
105 g_buffered_input_stream,
106 G_TYPE_FILTER_INPUT_STREAM)
110 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
112 GObjectClass *object_class;
113 GInputStreamClass *istream_class;
114 GBufferedInputStreamClass *bstream_class;
116 g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
118 object_class = G_OBJECT_CLASS (klass);
119 object_class->get_property = g_buffered_input_stream_get_property;
120 object_class->set_property = g_buffered_input_stream_set_property;
121 object_class->finalize = g_buffered_input_stream_finalize;
123 istream_class = G_INPUT_STREAM_CLASS (klass);
124 istream_class->skip = g_buffered_input_stream_skip;
125 istream_class->skip_async = g_buffered_input_stream_skip_async;
126 istream_class->skip_finish = g_buffered_input_stream_skip_finish;
127 istream_class->read = g_buffered_input_stream_read;
128 istream_class->read_async = g_buffered_input_stream_read_async;
129 istream_class->read_finish = g_buffered_input_stream_read_finish;
131 bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
132 bstream_class->fill = g_buffered_input_stream_real_fill;
133 bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
134 bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
136 g_object_class_install_property (object_class,
138 g_param_spec_uint ("buffer-size",
140 P_("The size of the backend buffer"),
144 G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
145 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
151 * g_buffered_input_stream_get_buffer_size:
152 * @stream: #GBufferedInputStream.
154 * Returns: the current buffer size, or -1 on error.
157 g_buffered_input_stream_get_buffer_size (GBufferedInputStream *stream)
159 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
161 return stream->priv->len;
165 * g_buffered_input_stream_set_buffer_size:
166 * @stream: #GBufferedInputStream.
169 * Sets the size of the internal buffer of @stream to @size, or to the
170 * size of the contents of the buffer. The buffer can never be resized
171 * smaller than its current contents.
174 g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream,
177 GBufferedInputStreamPrivate *priv;
181 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
187 in_buffer = priv->end - priv->pos;
189 /* Never resize smaller than current buffer contents */
190 size = MAX (size, in_buffer);
192 buffer = g_malloc (size);
193 memcpy (buffer, priv->buffer + priv->pos, in_buffer);
196 priv->end = in_buffer;
197 g_free (priv->buffer);
198 priv->buffer = buffer;
205 priv->buffer = g_malloc (size);
210 g_buffered_input_stream_set_property (GObject *object,
215 GBufferedInputStreamPrivate *priv;
216 GBufferedInputStream *bstream;
218 bstream = G_BUFFERED_INPUT_STREAM (object);
219 priv = bstream->priv;
224 g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
228 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
235 g_buffered_input_stream_get_property (GObject *object,
240 GBufferedInputStreamPrivate *priv;
241 GBufferedInputStream *bstream;
243 bstream = G_BUFFERED_INPUT_STREAM (object);
244 priv = bstream->priv;
250 g_value_set_uint (value, priv->len);
253 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
260 g_buffered_input_stream_finalize (GObject *object)
262 GBufferedInputStreamPrivate *priv;
263 GBufferedInputStream *stream;
265 stream = G_BUFFERED_INPUT_STREAM (object);
268 g_free (priv->buffer);
270 if (G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize)
271 (*G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) (object);
275 g_buffered_input_stream_init (GBufferedInputStream *stream)
277 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
278 G_TYPE_BUFFERED_INPUT_STREAM,
279 GBufferedInputStreamPrivate);
284 * g_buffered_input_stream_new:
285 * @base_stream: a #GInputStream.
287 * Creates a new #GInputStream from the given @base_stream, with
288 * a buffer set to the default size (4 kilobytes).
290 * Returns: a #GInputStream for the given @base_stream.
293 g_buffered_input_stream_new (GInputStream *base_stream)
295 GInputStream *stream;
297 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
299 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
300 "base-stream", base_stream,
307 * g_buffered_input_stream_new_sized:
308 * @base_stream: a #GOutputStream.
311 * Creates a new #GBufferedInputStream from the given @base_stream, with
312 * a buffer set to @size.
314 * Returns: a #GInputStream.
317 g_buffered_input_stream_new_sized (GInputStream *base_stream,
320 GInputStream *stream;
322 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
324 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
325 "base-stream", base_stream,
326 "buffer-size", (guint)size,
333 * g_buffered_input_stream_fill:
334 * @stream: #GBufferedInputStream.
335 * @count: the number of bytes that will be read from the stream.
336 * @cancellable: optional #GCancellable object, %NULL to ignore.
337 * @error: location to store the error occuring, or %NULL to ignore.
339 * Tries to read @count bytes from the stream into the buffer.
340 * Will block during this read.
342 * If @count is zero, returns zero and does nothing. A value of @count
343 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
345 * On success, the number of bytes read into the buffer is returned.
346 * It is not an error if this is not the same as the requested size, as it
347 * can happen e.g. near the end of a file. Zero is returned on end of file
348 * (or if @count is zero), but never otherwise.
350 * If @cancellable is not %NULL, then the operation can be cancelled by
351 * triggering the cancellable object from another thread. If the operation
352 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
353 * operation was partially finished when the operation was cancelled the
354 * partial result will be returned, without an error.
356 * On error -1 is returned and @error is set accordingly.
358 * For the asynchronous, non-blocking, version of this function, see
359 * g_buffered_input_stream_fill_async().
361 * Returns: the number of bytes read into @stream's buffer, up to @count,
365 g_buffered_input_stream_fill (GBufferedInputStream *stream,
367 GCancellable *cancellable,
370 GBufferedInputStreamClass *class;
371 GInputStream *input_stream;
374 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
376 input_stream = G_INPUT_STREAM (stream);
378 if (g_input_stream_is_closed (input_stream))
380 g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
381 _("Stream is already closed"));
385 if (g_input_stream_has_pending (input_stream))
387 g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
388 _("Stream has outstanding operation"));
392 g_input_stream_set_pending (input_stream, TRUE);
395 g_push_current_cancellable (cancellable);
397 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
398 res = class->fill (stream, count, cancellable, error);
401 g_pop_current_cancellable (cancellable);
403 g_input_stream_set_pending (input_stream, FALSE);
409 async_fill_callback_wrapper (GObject *source_object,
413 GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
415 g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE);
416 (*stream->priv->outstanding_callback) (source_object, res, user_data);
417 g_object_unref (stream);
421 * g_buffered_input_stream_fill_async:
422 * @stream: #GBufferedInputStream.
424 * @io_priority: the io priority of the request. the io priority of the request.
425 * @cancellable: optional #GCancellable object
426 * @callback: a #GAsyncReadyCallback.
427 * @user_data: a #gpointer.
429 * Reads data into @stream's buffer asynchronously, up to @count size.
430 * @io_priority can be used to prioritize reads. For the synchronous
431 * version of this function, see g_buffered_input_stream_fill().
436 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
439 GCancellable *cancellable,
440 GAsyncReadyCallback callback,
443 GBufferedInputStreamClass *class;
444 GSimpleAsyncResult *simple;
446 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
450 simple = g_simple_async_result_new (G_OBJECT (stream),
453 g_buffered_input_stream_fill_async);
454 g_simple_async_result_complete_in_idle (simple);
455 g_object_unref (simple);
459 if (((gssize) count) < 0)
461 g_simple_async_report_error_in_idle (G_OBJECT (stream),
464 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
465 _("Too large count value passed to g_input_stream_read_async"));
469 if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
471 g_simple_async_report_error_in_idle (G_OBJECT (stream),
474 G_IO_ERROR, G_IO_ERROR_CLOSED,
475 _("Stream is already closed"));
479 if (g_input_stream_has_pending (G_INPUT_STREAM (stream)))
481 g_simple_async_report_error_in_idle (G_OBJECT (stream),
484 G_IO_ERROR, G_IO_ERROR_PENDING,
485 _("Stream has outstanding operation"));
489 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
491 g_input_stream_set_pending (G_INPUT_STREAM (stream), TRUE);
492 stream->priv->outstanding_callback = callback;
493 g_object_ref (stream);
494 class->fill_async (stream, count, io_priority, cancellable,
495 async_fill_callback_wrapper, user_data);
499 * g_buffered_input_stream_fill_finished:
500 * @stream: a #GBufferedInputStream.
501 * @result: a #GAsyncResult.
504 * Finishes an asynchronous read.
506 * Returns: a #gssize of the read stream, or -1 on an error.
509 g_buffered_input_stream_fill_finish (GBufferedInputStream *stream,
510 GAsyncResult *result,
513 GSimpleAsyncResult *simple;
514 GBufferedInputStreamClass *class;
516 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
517 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
519 if (G_IS_SIMPLE_ASYNC_RESULT (result))
521 simple = G_SIMPLE_ASYNC_RESULT (result);
522 if (g_simple_async_result_propagate_error (simple, error))
525 /* Special case read of 0 bytes */
526 if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
530 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
531 return class->fill_finish (stream, result, error);
535 * g_buffered_input_stream_get_available:
536 * @stream: #GBufferedInputStream.
538 * Returns: size of the available stream.
541 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
543 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
545 return stream->priv->end - stream->priv->pos;
549 * g_buffered_input_stream_peek:
550 * @stream: a #GBufferedInputStream.
551 * @buffer: a pointer to an allocated chunk of memory.
558 g_buffered_input_stream_peek (GBufferedInputStream *stream,
566 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
567 g_return_val_if_fail (buffer != NULL, -1);
569 available = g_buffered_input_stream_get_available (stream);
571 if (offset > available)
574 end = MIN (offset + count, available);
575 count = end - offset;
577 memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
582 * g_buffered_input_stream_peek_buffer:
583 * @stream: a #GBufferedInputStream.
584 * @count: a #gsize to get the number of bytes available in the buffer.
586 * Returns the buffer with the currently available bytes. The returned
587 * buffer must not be modified and will become invalid when reading from
588 * the stream or filling the buffer.
590 * Returns: read-only buffer
593 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
596 GBufferedInputStreamPrivate *priv;
598 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
603 *count = priv->end - priv->pos;
606 return priv->buffer + priv->pos;
610 compact_buffer (GBufferedInputStream *stream)
612 GBufferedInputStreamPrivate *priv;
617 current_size = priv->end - priv->pos;
619 g_memmove (priv->buffer,
620 priv->buffer + priv->pos,
624 priv->end = current_size;
628 g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
630 GCancellable *cancellable,
633 GBufferedInputStreamPrivate *priv;
634 GInputStream *base_stream;
643 in_buffer = priv->end - priv->pos;
645 /* Never fill more than can fit in the buffer */
646 count = MIN (count, priv->len - in_buffer);
648 /* If requested length does not fit at end, compact */
649 if (priv->len - priv->end < count)
650 compact_buffer (stream);
652 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
653 nread = g_input_stream_read (base_stream,
654 priv->buffer + priv->end,
666 g_buffered_input_stream_skip (GInputStream *stream,
668 GCancellable *cancellable,
671 GBufferedInputStream *bstream;
672 GBufferedInputStreamPrivate *priv;
673 GInputStream *base_stream;
674 gsize available, bytes_skipped;
677 bstream = G_BUFFERED_INPUT_STREAM (stream);
678 priv = bstream->priv;
680 available = priv->end - priv->pos;
682 if (count <= available)
688 /* Full request not available, skip all currently availbile and request refill for more */
692 bytes_skipped = available;
695 if (bytes_skipped > 0)
696 error = NULL; /* Ignore further errors if we already read some data */
698 if (count > priv->len)
700 /* Large request, shortcut buffer */
702 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
704 nread = g_input_stream_skip (base_stream,
709 if (nread < 0 && bytes_skipped == 0)
713 bytes_skipped += nread;
715 return bytes_skipped;
718 g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
719 nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
720 g_input_stream_set_pending (stream, TRUE); /* enable again */
724 if (bytes_skipped == 0)
727 return bytes_skipped;
730 available = priv->end - priv->pos;
731 count = MIN (count, available);
733 bytes_skipped += count;
736 return bytes_skipped;
740 g_buffered_input_stream_read (GInputStream *stream,
743 GCancellable *cancellable,
746 GBufferedInputStream *bstream;
747 GBufferedInputStreamPrivate *priv;
748 GInputStream *base_stream;
749 gsize available, bytes_read;
752 bstream = G_BUFFERED_INPUT_STREAM (stream);
753 priv = bstream->priv;
755 available = priv->end - priv->pos;
757 if (count <= available)
759 memcpy (buffer, priv->buffer + priv->pos, count);
764 /* Full request not available, read all currently availbile and request refill for more */
766 memcpy (buffer, priv->buffer + priv->pos, available);
769 bytes_read = available;
773 error = NULL; /* Ignore further errors if we already read some data */
775 if (count > priv->len)
777 /* Large request, shortcut buffer */
779 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
781 nread = g_input_stream_read (base_stream,
782 (char *)buffer + bytes_read,
787 if (nread < 0 && bytes_read == 0)
796 g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
797 nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
798 g_input_stream_set_pending (stream, TRUE); /* enable again */
807 available = priv->end - priv->pos;
808 count = MIN (count, available);
810 memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
818 * g_buffered_input_stream_read_byte:
819 * @stream: #GBufferedInputStream.
820 * @cancellable: optional #GCancellable object, %NULL to ignore.
821 * @error: location to store the error occuring, or %NULL to ignore.
823 * Tries to read a single byte from the stream or the buffer. Will block
826 * On success, the byte read from the stream is returned. On end of stream
827 * -1 is returned but it's not an exceptional error and @error is not set.
829 * If @cancellable is not %NULL, then the operation can be cancelled by
830 * triggering the cancellable object from another thread. If the operation
831 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
832 * operation was partially finished when the operation was cancelled the
833 * partial result will be returned, without an error.
835 * On error -1 is returned and @error is set accordingly.
837 * Returns: the byte read from the @stream, or -1 on end of stream or error.
840 g_buffered_input_stream_read_byte (GBufferedInputStream *stream,
841 GCancellable *cancellable,
844 GBufferedInputStreamPrivate *priv;
845 GInputStream *input_stream;
849 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
852 input_stream = G_INPUT_STREAM (stream);
854 if (g_input_stream_is_closed (input_stream))
856 g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
857 _("Stream is already closed"));
861 if (g_input_stream_has_pending (input_stream))
863 g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
864 _("Stream has outstanding operation"));
868 available = priv->end - priv->pos;
871 return priv->buffer[priv->pos++];
873 /* Byte not available, request refill for more */
876 g_push_current_cancellable (cancellable);
881 nread = g_buffered_input_stream_fill (stream, priv->len, cancellable, error);
884 g_pop_current_cancellable (cancellable);
887 return -1; /* error or end of stream */
889 return priv->buffer[priv->pos++];
892 /* ************************** */
893 /* Async stuff implementation */
894 /* ************************** */
897 fill_async_callback (GObject *source_object,
898 GAsyncResult *result,
903 GSimpleAsyncResult *simple;
908 res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
911 g_simple_async_result_set_op_res_gssize (simple, res);
914 g_simple_async_result_set_from_error (simple, error);
915 g_error_free (error);
918 /* Complete immediately, not in idle, since we're already in a mainloop callout */
919 g_simple_async_result_complete (simple);
920 g_object_unref (simple);
924 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
927 GCancellable *cancellable,
928 GAsyncReadyCallback callback,
931 GBufferedInputStreamPrivate *priv;
932 GInputStream *base_stream;
933 GSimpleAsyncResult *simple;
941 in_buffer = priv->end - priv->pos;
943 /* Never fill more than can fit in the buffer */
944 count = MIN (count, priv->len - in_buffer);
946 /* If requested length does not fit at end, compact */
947 if (priv->len - priv->end < count)
948 compact_buffer (stream);
950 simple = g_simple_async_result_new (G_OBJECT (stream),
952 g_buffered_input_stream_real_fill_async);
954 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
955 g_input_stream_read_async (base_stream,
956 priv->buffer + priv->end,
965 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
966 GAsyncResult *result,
969 GSimpleAsyncResult *simple;
972 simple = G_SIMPLE_ASYNC_RESULT (result);
973 g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
975 nread = g_simple_async_result_get_op_res_gssize (simple);
986 free_read_async_data (gpointer _data)
988 ReadAsyncData *data = _data;
989 g_slice_free (ReadAsyncData, data);
993 large_read_callback (GObject *source_object,
994 GAsyncResult *result,
997 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1002 data = g_simple_async_result_get_op_res_gpointer (simple);
1005 nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1008 /* Only report the error if we've not already read some data */
1009 if (nread < 0 && data->bytes_read == 0)
1010 g_simple_async_result_set_from_error (simple, error);
1013 data->bytes_read += nread;
1016 g_error_free (error);
1018 /* Complete immediately, not in idle, since we're already in a mainloop callout */
1019 g_simple_async_result_complete (simple);
1020 g_object_unref (simple);
1024 read_fill_buffer_callback (GObject *source_object,
1025 GAsyncResult *result,
1028 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1029 GBufferedInputStream *bstream;
1030 GBufferedInputStreamPrivate *priv;
1031 ReadAsyncData *data;
1036 bstream = G_BUFFERED_INPUT_STREAM (source_object);
1037 priv = bstream->priv;
1039 g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1041 data = g_simple_async_result_get_op_res_gpointer (simple);
1044 nread = g_buffered_input_stream_fill_finish (bstream,
1047 if (nread < 0 && data->bytes_read == 0)
1048 g_simple_async_result_set_from_error (simple, error);
1053 available = priv->end - priv->pos;
1054 data->count = MIN (data->count, available);
1056 memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1057 data->bytes_read += data->count;
1058 priv->pos += data->count;
1062 g_error_free (error);
1064 /* Complete immediately, not in idle, since we're already in a mainloop callout */
1065 g_simple_async_result_complete (simple);
1066 g_object_unref (simple);
1070 g_buffered_input_stream_read_async (GInputStream *stream,
1074 GCancellable *cancellable,
1075 GAsyncReadyCallback callback,
1078 GBufferedInputStream *bstream;
1079 GBufferedInputStreamPrivate *priv;
1080 GInputStream *base_stream;
1082 GSimpleAsyncResult *simple;
1083 ReadAsyncData *data;
1085 bstream = G_BUFFERED_INPUT_STREAM (stream);
1086 priv = bstream->priv;
1088 data = g_slice_new (ReadAsyncData);
1089 data->buffer = buffer;
1090 data->bytes_read = 0;
1091 simple = g_simple_async_result_new (G_OBJECT (stream),
1092 callback, user_data,
1093 g_buffered_input_stream_read_async);
1094 g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1096 available = priv->end - priv->pos;
1098 if (count <= available)
1100 memcpy (buffer, priv->buffer + priv->pos, count);
1102 data->bytes_read = count;
1104 g_simple_async_result_complete_in_idle (simple);
1105 g_object_unref (simple);
1110 /* Full request not available, read all currently availbile and request refill for more */
1112 memcpy (buffer, priv->buffer + priv->pos, available);
1118 data->bytes_read = available;
1119 data->count = count;
1121 if (count > priv->len)
1123 /* Large request, shortcut buffer */
1125 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1127 g_input_stream_read_async (base_stream,
1128 (char *)buffer + data->bytes_read,
1130 io_priority, cancellable,
1131 large_read_callback,
1136 g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1137 g_buffered_input_stream_fill_async (bstream, priv->len,
1138 io_priority, cancellable,
1139 read_fill_buffer_callback, simple);
1144 g_buffered_input_stream_read_finish (GInputStream *stream,
1145 GAsyncResult *result,
1148 GSimpleAsyncResult *simple;
1149 ReadAsyncData *data;
1151 simple = G_SIMPLE_ASYNC_RESULT (result);
1153 g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1155 data = g_simple_async_result_get_op_res_gpointer (simple);
1157 return data->bytes_read;
1161 gssize bytes_skipped;
1166 free_skip_async_data (gpointer _data)
1168 SkipAsyncData *data = _data;
1169 g_slice_free (SkipAsyncData, data);
1173 large_skip_callback (GObject *source_object,
1174 GAsyncResult *result,
1177 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1178 SkipAsyncData *data;
1182 data = g_simple_async_result_get_op_res_gpointer (simple);
1185 nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1188 /* Only report the error if we've not already read some data */
1189 if (nread < 0 && data->bytes_skipped == 0)
1190 g_simple_async_result_set_from_error (simple, error);
1193 data->bytes_skipped += nread;
1196 g_error_free (error);
1198 /* Complete immediately, not in idle, since we're already in a mainloop callout */
1199 g_simple_async_result_complete (simple);
1200 g_object_unref (simple);
1204 skip_fill_buffer_callback (GObject *source_object,
1205 GAsyncResult *result,
1208 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1209 GBufferedInputStream *bstream;
1210 GBufferedInputStreamPrivate *priv;
1211 SkipAsyncData *data;
1216 bstream = G_BUFFERED_INPUT_STREAM (source_object);
1217 priv = bstream->priv;
1219 g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1221 data = g_simple_async_result_get_op_res_gpointer (simple);
1224 nread = g_buffered_input_stream_fill_finish (bstream,
1227 if (nread < 0 && data->bytes_skipped == 0)
1228 g_simple_async_result_set_from_error (simple, error);
1233 available = priv->end - priv->pos;
1234 data->count = MIN (data->count, available);
1236 data->bytes_skipped += data->count;
1237 priv->pos += data->count;
1241 g_error_free (error);
1243 /* Complete immediately, not in idle, since we're already in a mainloop callout */
1244 g_simple_async_result_complete (simple);
1245 g_object_unref (simple);
1249 g_buffered_input_stream_skip_async (GInputStream *stream,
1252 GCancellable *cancellable,
1253 GAsyncReadyCallback callback,
1256 GBufferedInputStream *bstream;
1257 GBufferedInputStreamPrivate *priv;
1258 GInputStream *base_stream;
1260 GSimpleAsyncResult *simple;
1261 SkipAsyncData *data;
1263 bstream = G_BUFFERED_INPUT_STREAM (stream);
1264 priv = bstream->priv;
1266 data = g_slice_new (SkipAsyncData);
1267 data->bytes_skipped = 0;
1268 simple = g_simple_async_result_new (G_OBJECT (stream),
1269 callback, user_data,
1270 g_buffered_input_stream_skip_async);
1271 g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1273 available = priv->end - priv->pos;
1275 if (count <= available)
1278 data->bytes_skipped = count;
1280 g_simple_async_result_complete_in_idle (simple);
1281 g_object_unref (simple);
1286 /* Full request not available, skip all currently availbile and request refill for more */
1293 data->bytes_skipped = available;
1294 data->count = count;
1296 if (count > priv->len)
1298 /* Large request, shortcut buffer */
1300 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1302 g_input_stream_skip_async (base_stream,
1304 io_priority, cancellable,
1305 large_skip_callback,
1310 g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1311 g_buffered_input_stream_fill_async (bstream, priv->len,
1312 io_priority, cancellable,
1313 skip_fill_buffer_callback, simple);
1318 g_buffered_input_stream_skip_finish (GInputStream *stream,
1319 GAsyncResult *result,
1322 GSimpleAsyncResult *simple;
1323 SkipAsyncData *data;
1325 simple = G_SIMPLE_ASYNC_RESULT (result);
1327 g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1329 data = g_simple_async_result_get_op_res_gpointer (simple);
1331 return data->bytes_skipped;
1334 /* vim: ts=2 sw=2 et */