1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 * Copyright (C) 2007 Jürg Billeter
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General
17 * Public License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19 * Boston, MA 02111-1307, USA.
21 * Author: Christian Kellner <gicmo@gnome.org>
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gsimpleasyncresult.h"
32 * SECTION:gbufferedinputstream
33 * @short_description: Buffered Input Stream
34 * @see_also: #GFilterInputStream, #GInputStream
36 * Buffered input stream implements #GFilterInputStream and provides
39 * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
41 * To create a buffered input stream, use g_buffered_input_stream_new(), or
42 * g_buffered_input_stream_new_sized() to specify the buffer's size at construction.
44 * To get the size of a buffer within a buffered input stream, use
45 * g_buffered_input_stream_get_buffer_size(). To change the size of a
46 * buffered input stream's buffer, use g_buffered_input_stream_set_buffer_size().
47 * Note: the buffer's size cannot be reduced below the size of the data within the
54 #define DEFAULT_BUFFER_SIZE 4096
56 struct _GBufferedInputStreamPrivate {
61 GAsyncReadyCallback outstanding_callback;
69 static void g_buffered_input_stream_set_property (GObject *object,
74 static void g_buffered_input_stream_get_property (GObject *object,
78 static void g_buffered_input_stream_finalize (GObject *object);
81 static gssize g_buffered_input_stream_skip (GInputStream *stream,
83 GCancellable *cancellable,
85 static void g_buffered_input_stream_skip_async (GInputStream *stream,
88 GCancellable *cancellable,
89 GAsyncReadyCallback callback,
91 static gssize g_buffered_input_stream_skip_finish (GInputStream *stream,
94 static gssize g_buffered_input_stream_read (GInputStream *stream,
97 GCancellable *cancellable,
99 static void g_buffered_input_stream_read_async (GInputStream *stream,
103 GCancellable *cancellable,
104 GAsyncReadyCallback callback,
106 static gssize g_buffered_input_stream_read_finish (GInputStream *stream,
107 GAsyncResult *result,
109 static gssize g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
111 GCancellable *cancellable,
113 static void g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
116 GCancellable *cancellable,
117 GAsyncReadyCallback callback,
119 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
120 GAsyncResult *result,
123 static void compact_buffer (GBufferedInputStream *stream);
125 G_DEFINE_TYPE (GBufferedInputStream,
126 g_buffered_input_stream,
127 G_TYPE_FILTER_INPUT_STREAM)
131 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
133 GObjectClass *object_class;
134 GInputStreamClass *istream_class;
135 GBufferedInputStreamClass *bstream_class;
137 g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
139 object_class = G_OBJECT_CLASS (klass);
140 object_class->get_property = g_buffered_input_stream_get_property;
141 object_class->set_property = g_buffered_input_stream_set_property;
142 object_class->finalize = g_buffered_input_stream_finalize;
144 istream_class = G_INPUT_STREAM_CLASS (klass);
145 istream_class->skip = g_buffered_input_stream_skip;
146 istream_class->skip_async = g_buffered_input_stream_skip_async;
147 istream_class->skip_finish = g_buffered_input_stream_skip_finish;
148 istream_class->read = g_buffered_input_stream_read;
149 istream_class->read_async = g_buffered_input_stream_read_async;
150 istream_class->read_finish = g_buffered_input_stream_read_finish;
152 bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
153 bstream_class->fill = g_buffered_input_stream_real_fill;
154 bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
155 bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
157 g_object_class_install_property (object_class,
159 g_param_spec_uint ("buffer-size",
161 P_("The size of the backend buffer"),
165 G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
166 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
172 * g_buffered_input_stream_get_buffer_size:
173 * @stream: #GBufferedInputStream.
175 * Gets the size of the input buffer.
177 * Returns: the current buffer size, or %-1 on error.
180 g_buffered_input_stream_get_buffer_size (GBufferedInputStream *stream)
182 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
184 return stream->priv->len;
188 * g_buffered_input_stream_set_buffer_size:
189 * @stream: #GBufferedInputStream.
192 * Sets the size of the internal buffer of @stream to @size, or to the
193 * size of the contents of the buffer. The buffer can never be resized
194 * smaller than its current contents.
197 g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream,
200 GBufferedInputStreamPrivate *priv;
204 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
210 in_buffer = priv->end - priv->pos;
212 /* Never resize smaller than current buffer contents */
213 size = MAX (size, in_buffer);
215 buffer = g_malloc (size);
216 memcpy (buffer, priv->buffer + priv->pos, in_buffer);
219 priv->end = in_buffer;
220 g_free (priv->buffer);
221 priv->buffer = buffer;
228 priv->buffer = g_malloc (size);
233 g_buffered_input_stream_set_property (GObject *object,
238 GBufferedInputStreamPrivate *priv;
239 GBufferedInputStream *bstream;
241 bstream = G_BUFFERED_INPUT_STREAM (object);
242 priv = bstream->priv;
247 g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
251 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
258 g_buffered_input_stream_get_property (GObject *object,
263 GBufferedInputStreamPrivate *priv;
264 GBufferedInputStream *bstream;
266 bstream = G_BUFFERED_INPUT_STREAM (object);
267 priv = bstream->priv;
273 g_value_set_uint (value, priv->len);
276 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
283 g_buffered_input_stream_finalize (GObject *object)
285 GBufferedInputStreamPrivate *priv;
286 GBufferedInputStream *stream;
288 stream = G_BUFFERED_INPUT_STREAM (object);
291 g_free (priv->buffer);
293 if (G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize)
294 (*G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) (object);
298 g_buffered_input_stream_init (GBufferedInputStream *stream)
300 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
301 G_TYPE_BUFFERED_INPUT_STREAM,
302 GBufferedInputStreamPrivate);
307 * g_buffered_input_stream_new:
308 * @base_stream: a #GInputStream.
310 * Creates a new #GInputStream from the given @base_stream, with
311 * a buffer set to the default size (4 kilobytes).
313 * Returns: a #GInputStream for the given @base_stream.
316 g_buffered_input_stream_new (GInputStream *base_stream)
318 GInputStream *stream;
320 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
322 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
323 "base-stream", base_stream,
330 * g_buffered_input_stream_new_sized:
331 * @base_stream: a #GOutputStream.
334 * Creates a new #GBufferedInputStream from the given @base_stream, with
335 * a buffer set to @size.
337 * Returns: a #GInputStream.
340 g_buffered_input_stream_new_sized (GInputStream *base_stream,
343 GInputStream *stream;
345 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
347 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
348 "base-stream", base_stream,
349 "buffer-size", (guint)size,
356 * g_buffered_input_stream_fill:
357 * @stream: #GBufferedInputStream.
358 * @count: the number of bytes that will be read from the stream.
359 * @cancellable: optional #GCancellable object, %NULL to ignore.
360 * @error: location to store the error occuring, or %NULL to ignore.
362 * Tries to read @count bytes from the stream into the buffer.
363 * Will block during this read.
365 * If @count is zero, returns zero and does nothing. A value of @count
366 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
368 * On success, the number of bytes read into the buffer is returned.
369 * It is not an error if this is not the same as the requested size, as it
370 * can happen e.g. near the end of a file. Zero is returned on end of file
371 * (or if @count is zero), but never otherwise.
373 * If @cancellable is not %NULL, then the operation can be cancelled by
374 * triggering the cancellable object from another thread. If the operation
375 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
376 * operation was partially finished when the operation was cancelled the
377 * partial result will be returned, without an error.
379 * On error -1 is returned and @error is set accordingly.
381 * For the asynchronous, non-blocking, version of this function, see
382 * g_buffered_input_stream_fill_async().
384 * Returns: the number of bytes read into @stream's buffer, up to @count,
388 g_buffered_input_stream_fill (GBufferedInputStream *stream,
390 GCancellable *cancellable,
393 GBufferedInputStreamClass *class;
394 GInputStream *input_stream;
397 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
399 input_stream = G_INPUT_STREAM (stream);
401 if (g_input_stream_is_closed (input_stream))
403 g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
404 _("Stream is already closed"));
408 if (g_input_stream_has_pending (input_stream))
410 g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
411 _("Stream has outstanding operation"));
415 g_input_stream_set_pending (input_stream, TRUE);
418 g_push_current_cancellable (cancellable);
420 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
421 res = class->fill (stream, count, cancellable, error);
424 g_pop_current_cancellable (cancellable);
426 g_input_stream_set_pending (input_stream, FALSE);
432 async_fill_callback_wrapper (GObject *source_object,
436 GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
438 g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE);
439 (*stream->priv->outstanding_callback) (source_object, res, user_data);
440 g_object_unref (stream);
444 * g_buffered_input_stream_fill_async:
445 * @stream: #GBufferedInputStream.
447 * @io_priority: the io priority of the request. the io priority of the request.
448 * @cancellable: optional #GCancellable object
449 * @callback: a #GAsyncReadyCallback.
450 * @user_data: a #gpointer.
452 * Reads data into @stream's buffer asynchronously, up to @count size.
453 * @io_priority can be used to prioritize reads. For the synchronous
454 * version of this function, see g_buffered_input_stream_fill().
459 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
462 GCancellable *cancellable,
463 GAsyncReadyCallback callback,
466 GBufferedInputStreamClass *class;
467 GSimpleAsyncResult *simple;
469 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
473 simple = g_simple_async_result_new (G_OBJECT (stream),
476 g_buffered_input_stream_fill_async);
477 g_simple_async_result_complete_in_idle (simple);
478 g_object_unref (simple);
482 if (((gssize) count) < 0)
484 g_simple_async_report_error_in_idle (G_OBJECT (stream),
487 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
488 _("Too large count value passed to g_input_stream_read_async"));
492 if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
494 g_simple_async_report_error_in_idle (G_OBJECT (stream),
497 G_IO_ERROR, G_IO_ERROR_CLOSED,
498 _("Stream is already closed"));
502 if (g_input_stream_has_pending (G_INPUT_STREAM (stream)))
504 g_simple_async_report_error_in_idle (G_OBJECT (stream),
507 G_IO_ERROR, G_IO_ERROR_PENDING,
508 _("Stream has outstanding operation"));
512 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
514 g_input_stream_set_pending (G_INPUT_STREAM (stream), TRUE);
515 stream->priv->outstanding_callback = callback;
516 g_object_ref (stream);
517 class->fill_async (stream, count, io_priority, cancellable,
518 async_fill_callback_wrapper, user_data);
522 * g_buffered_input_stream_fill_finish:
523 * @stream: a #GBufferedInputStream.
524 * @result: a #GAsyncResult.
527 * Finishes an asynchronous read.
529 * Returns: a #gssize of the read stream, or %-1 on an error.
532 g_buffered_input_stream_fill_finish (GBufferedInputStream *stream,
533 GAsyncResult *result,
536 GSimpleAsyncResult *simple;
537 GBufferedInputStreamClass *class;
539 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
540 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
542 if (G_IS_SIMPLE_ASYNC_RESULT (result))
544 simple = G_SIMPLE_ASYNC_RESULT (result);
545 if (g_simple_async_result_propagate_error (simple, error))
548 /* Special case read of 0 bytes */
549 if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
553 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
554 return class->fill_finish (stream, result, error);
558 * g_buffered_input_stream_get_available:
559 * @stream: #GBufferedInputStream.
561 * Gets the size of the available data within the stream.
563 * Returns: size of the available stream.
566 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
568 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
570 return stream->priv->end - stream->priv->pos;
574 * g_buffered_input_stream_peek:
575 * @stream: a #GBufferedInputStream.
576 * @buffer: a pointer to an allocated chunk of memory.
580 * Peeks in the buffer, copying data of size @count into @buffer, offset
583 * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
586 g_buffered_input_stream_peek (GBufferedInputStream *stream,
594 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
595 g_return_val_if_fail (buffer != NULL, -1);
597 available = g_buffered_input_stream_get_available (stream);
599 if (offset > available)
602 end = MIN (offset + count, available);
603 count = end - offset;
605 memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
610 * g_buffered_input_stream_peek_buffer:
611 * @stream: a #GBufferedInputStream.
612 * @count: a #gsize to get the number of bytes available in the buffer.
614 * Returns the buffer with the currently available bytes. The returned
615 * buffer must not be modified and will become invalid when reading from
616 * the stream or filling the buffer.
618 * Returns: read-only buffer
621 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
624 GBufferedInputStreamPrivate *priv;
626 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
631 *count = priv->end - priv->pos;
634 return priv->buffer + priv->pos;
638 compact_buffer (GBufferedInputStream *stream)
640 GBufferedInputStreamPrivate *priv;
645 current_size = priv->end - priv->pos;
647 g_memmove (priv->buffer,
648 priv->buffer + priv->pos,
652 priv->end = current_size;
656 g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
658 GCancellable *cancellable,
661 GBufferedInputStreamPrivate *priv;
662 GInputStream *base_stream;
671 in_buffer = priv->end - priv->pos;
673 /* Never fill more than can fit in the buffer */
674 count = MIN (count, priv->len - in_buffer);
676 /* If requested length does not fit at end, compact */
677 if (priv->len - priv->end < count)
678 compact_buffer (stream);
680 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
681 nread = g_input_stream_read (base_stream,
682 priv->buffer + priv->end,
694 g_buffered_input_stream_skip (GInputStream *stream,
696 GCancellable *cancellable,
699 GBufferedInputStream *bstream;
700 GBufferedInputStreamPrivate *priv;
701 GInputStream *base_stream;
702 gsize available, bytes_skipped;
705 bstream = G_BUFFERED_INPUT_STREAM (stream);
706 priv = bstream->priv;
708 available = priv->end - priv->pos;
710 if (count <= available)
716 /* Full request not available, skip all currently availbile and request refill for more */
720 bytes_skipped = available;
723 if (bytes_skipped > 0)
724 error = NULL; /* Ignore further errors if we already read some data */
726 if (count > priv->len)
728 /* Large request, shortcut buffer */
730 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
732 nread = g_input_stream_skip (base_stream,
737 if (nread < 0 && bytes_skipped == 0)
741 bytes_skipped += nread;
743 return bytes_skipped;
746 g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
747 nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
748 g_input_stream_set_pending (stream, TRUE); /* enable again */
752 if (bytes_skipped == 0)
755 return bytes_skipped;
758 available = priv->end - priv->pos;
759 count = MIN (count, available);
761 bytes_skipped += count;
764 return bytes_skipped;
768 g_buffered_input_stream_read (GInputStream *stream,
771 GCancellable *cancellable,
774 GBufferedInputStream *bstream;
775 GBufferedInputStreamPrivate *priv;
776 GInputStream *base_stream;
777 gsize available, bytes_read;
780 bstream = G_BUFFERED_INPUT_STREAM (stream);
781 priv = bstream->priv;
783 available = priv->end - priv->pos;
785 if (count <= available)
787 memcpy (buffer, priv->buffer + priv->pos, count);
792 /* Full request not available, read all currently availbile and request refill for more */
794 memcpy (buffer, priv->buffer + priv->pos, available);
797 bytes_read = available;
801 error = NULL; /* Ignore further errors if we already read some data */
803 if (count > priv->len)
805 /* Large request, shortcut buffer */
807 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
809 nread = g_input_stream_read (base_stream,
810 (char *)buffer + bytes_read,
815 if (nread < 0 && bytes_read == 0)
824 g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
825 nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
826 g_input_stream_set_pending (stream, TRUE); /* enable again */
835 available = priv->end - priv->pos;
836 count = MIN (count, available);
838 memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
846 * g_buffered_input_stream_read_byte:
847 * @stream: #GBufferedInputStream.
848 * @cancellable: optional #GCancellable object, %NULL to ignore.
849 * @error: location to store the error occuring, or %NULL to ignore.
851 * Tries to read a single byte from the stream or the buffer. Will block
854 * On success, the byte read from the stream is returned. On end of stream
855 * -1 is returned but it's not an exceptional error and @error is not set.
857 * If @cancellable is not %NULL, then the operation can be cancelled by
858 * triggering the cancellable object from another thread. If the operation
859 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
860 * operation was partially finished when the operation was cancelled the
861 * partial result will be returned, without an error.
863 * On error -1 is returned and @error is set accordingly.
865 * Returns: the byte read from the @stream, or -1 on end of stream or error.
868 g_buffered_input_stream_read_byte (GBufferedInputStream *stream,
869 GCancellable *cancellable,
872 GBufferedInputStreamPrivate *priv;
873 GInputStream *input_stream;
877 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
880 input_stream = G_INPUT_STREAM (stream);
882 if (g_input_stream_is_closed (input_stream))
884 g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
885 _("Stream is already closed"));
889 if (g_input_stream_has_pending (input_stream))
891 g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
892 _("Stream has outstanding operation"));
896 available = priv->end - priv->pos;
899 return priv->buffer[priv->pos++];
901 /* Byte not available, request refill for more */
904 g_push_current_cancellable (cancellable);
909 nread = g_buffered_input_stream_fill (stream, priv->len, cancellable, error);
912 g_pop_current_cancellable (cancellable);
915 return -1; /* error or end of stream */
917 return priv->buffer[priv->pos++];
920 /* ************************** */
921 /* Async stuff implementation */
922 /* ************************** */
925 fill_async_callback (GObject *source_object,
926 GAsyncResult *result,
931 GSimpleAsyncResult *simple;
936 res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
939 g_simple_async_result_set_op_res_gssize (simple, res);
942 g_simple_async_result_set_from_error (simple, error);
943 g_error_free (error);
946 /* Complete immediately, not in idle, since we're already in a mainloop callout */
947 g_simple_async_result_complete (simple);
948 g_object_unref (simple);
952 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
955 GCancellable *cancellable,
956 GAsyncReadyCallback callback,
959 GBufferedInputStreamPrivate *priv;
960 GInputStream *base_stream;
961 GSimpleAsyncResult *simple;
969 in_buffer = priv->end - priv->pos;
971 /* Never fill more than can fit in the buffer */
972 count = MIN (count, priv->len - in_buffer);
974 /* If requested length does not fit at end, compact */
975 if (priv->len - priv->end < count)
976 compact_buffer (stream);
978 simple = g_simple_async_result_new (G_OBJECT (stream),
980 g_buffered_input_stream_real_fill_async);
982 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
983 g_input_stream_read_async (base_stream,
984 priv->buffer + priv->end,
993 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
994 GAsyncResult *result,
997 GSimpleAsyncResult *simple;
1000 simple = G_SIMPLE_ASYNC_RESULT (result);
1001 g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1003 nread = g_simple_async_result_get_op_res_gssize (simple);
1014 free_read_async_data (gpointer _data)
1016 ReadAsyncData *data = _data;
1017 g_slice_free (ReadAsyncData, data);
1021 large_read_callback (GObject *source_object,
1022 GAsyncResult *result,
1025 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1026 ReadAsyncData *data;
1030 data = g_simple_async_result_get_op_res_gpointer (simple);
1033 nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1036 /* Only report the error if we've not already read some data */
1037 if (nread < 0 && data->bytes_read == 0)
1038 g_simple_async_result_set_from_error (simple, error);
1041 data->bytes_read += nread;
1044 g_error_free (error);
1046 /* Complete immediately, not in idle, since we're already in a mainloop callout */
1047 g_simple_async_result_complete (simple);
1048 g_object_unref (simple);
1052 read_fill_buffer_callback (GObject *source_object,
1053 GAsyncResult *result,
1056 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1057 GBufferedInputStream *bstream;
1058 GBufferedInputStreamPrivate *priv;
1059 ReadAsyncData *data;
1064 bstream = G_BUFFERED_INPUT_STREAM (source_object);
1065 priv = bstream->priv;
1067 g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1069 data = g_simple_async_result_get_op_res_gpointer (simple);
1072 nread = g_buffered_input_stream_fill_finish (bstream,
1075 if (nread < 0 && data->bytes_read == 0)
1076 g_simple_async_result_set_from_error (simple, error);
1081 available = priv->end - priv->pos;
1082 data->count = MIN (data->count, available);
1084 memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1085 data->bytes_read += data->count;
1086 priv->pos += data->count;
1090 g_error_free (error);
1092 /* Complete immediately, not in idle, since we're already in a mainloop callout */
1093 g_simple_async_result_complete (simple);
1094 g_object_unref (simple);
1098 g_buffered_input_stream_read_async (GInputStream *stream,
1102 GCancellable *cancellable,
1103 GAsyncReadyCallback callback,
1106 GBufferedInputStream *bstream;
1107 GBufferedInputStreamPrivate *priv;
1108 GInputStream *base_stream;
1110 GSimpleAsyncResult *simple;
1111 ReadAsyncData *data;
1113 bstream = G_BUFFERED_INPUT_STREAM (stream);
1114 priv = bstream->priv;
1116 data = g_slice_new (ReadAsyncData);
1117 data->buffer = buffer;
1118 data->bytes_read = 0;
1119 simple = g_simple_async_result_new (G_OBJECT (stream),
1120 callback, user_data,
1121 g_buffered_input_stream_read_async);
1122 g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1124 available = priv->end - priv->pos;
1126 if (count <= available)
1128 memcpy (buffer, priv->buffer + priv->pos, count);
1130 data->bytes_read = count;
1132 g_simple_async_result_complete_in_idle (simple);
1133 g_object_unref (simple);
1138 /* Full request not available, read all currently availbile and request refill for more */
1140 memcpy (buffer, priv->buffer + priv->pos, available);
1146 data->bytes_read = available;
1147 data->count = count;
1149 if (count > priv->len)
1151 /* Large request, shortcut buffer */
1153 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1155 g_input_stream_read_async (base_stream,
1156 (char *)buffer + data->bytes_read,
1158 io_priority, cancellable,
1159 large_read_callback,
1164 g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1165 g_buffered_input_stream_fill_async (bstream, priv->len,
1166 io_priority, cancellable,
1167 read_fill_buffer_callback, simple);
1172 g_buffered_input_stream_read_finish (GInputStream *stream,
1173 GAsyncResult *result,
1176 GSimpleAsyncResult *simple;
1177 ReadAsyncData *data;
1179 simple = G_SIMPLE_ASYNC_RESULT (result);
1181 g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1183 data = g_simple_async_result_get_op_res_gpointer (simple);
1185 return data->bytes_read;
1189 gssize bytes_skipped;
1194 free_skip_async_data (gpointer _data)
1196 SkipAsyncData *data = _data;
1197 g_slice_free (SkipAsyncData, data);
1201 large_skip_callback (GObject *source_object,
1202 GAsyncResult *result,
1205 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1206 SkipAsyncData *data;
1210 data = g_simple_async_result_get_op_res_gpointer (simple);
1213 nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1216 /* Only report the error if we've not already read some data */
1217 if (nread < 0 && data->bytes_skipped == 0)
1218 g_simple_async_result_set_from_error (simple, error);
1221 data->bytes_skipped += nread;
1224 g_error_free (error);
1226 /* Complete immediately, not in idle, since we're already in a mainloop callout */
1227 g_simple_async_result_complete (simple);
1228 g_object_unref (simple);
1232 skip_fill_buffer_callback (GObject *source_object,
1233 GAsyncResult *result,
1236 GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1237 GBufferedInputStream *bstream;
1238 GBufferedInputStreamPrivate *priv;
1239 SkipAsyncData *data;
1244 bstream = G_BUFFERED_INPUT_STREAM (source_object);
1245 priv = bstream->priv;
1247 g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1249 data = g_simple_async_result_get_op_res_gpointer (simple);
1252 nread = g_buffered_input_stream_fill_finish (bstream,
1255 if (nread < 0 && data->bytes_skipped == 0)
1256 g_simple_async_result_set_from_error (simple, error);
1261 available = priv->end - priv->pos;
1262 data->count = MIN (data->count, available);
1264 data->bytes_skipped += data->count;
1265 priv->pos += data->count;
1269 g_error_free (error);
1271 /* Complete immediately, not in idle, since we're already in a mainloop callout */
1272 g_simple_async_result_complete (simple);
1273 g_object_unref (simple);
1277 g_buffered_input_stream_skip_async (GInputStream *stream,
1280 GCancellable *cancellable,
1281 GAsyncReadyCallback callback,
1284 GBufferedInputStream *bstream;
1285 GBufferedInputStreamPrivate *priv;
1286 GInputStream *base_stream;
1288 GSimpleAsyncResult *simple;
1289 SkipAsyncData *data;
1291 bstream = G_BUFFERED_INPUT_STREAM (stream);
1292 priv = bstream->priv;
1294 data = g_slice_new (SkipAsyncData);
1295 data->bytes_skipped = 0;
1296 simple = g_simple_async_result_new (G_OBJECT (stream),
1297 callback, user_data,
1298 g_buffered_input_stream_skip_async);
1299 g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1301 available = priv->end - priv->pos;
1303 if (count <= available)
1306 data->bytes_skipped = count;
1308 g_simple_async_result_complete_in_idle (simple);
1309 g_object_unref (simple);
1314 /* Full request not available, skip all currently availbile and request refill for more */
1321 data->bytes_skipped = available;
1322 data->count = count;
1324 if (count > priv->len)
1326 /* Large request, shortcut buffer */
1328 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1330 g_input_stream_skip_async (base_stream,
1332 io_priority, cancellable,
1333 large_skip_callback,
1338 g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1339 g_buffered_input_stream_fill_async (bstream, priv->len,
1340 io_priority, cancellable,
1341 skip_fill_buffer_callback, simple);
1346 g_buffered_input_stream_skip_finish (GInputStream *stream,
1347 GAsyncResult *result,
1350 GSimpleAsyncResult *simple;
1351 SkipAsyncData *data;
1353 simple = G_SIMPLE_ASYNC_RESULT (result);
1355 g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1357 data = g_simple_async_result_get_op_res_gpointer (simple);
1359 return data->bytes_skipped;
1362 /* vim: ts=2 sw=2 et */