Tizen 2.1 base
[platform/upstream/glib2.0.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org>
22  */
23
24 #include "config.h"
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gcancellable.h"
28 #include "gasyncresult.h"
29 #include "gsimpleasyncresult.h"
30 #include "gioerror.h"
31 #include <string.h>
32 #include "glibintl.h"
33
34
35 /**
36  * SECTION:gbufferedinputstream
37  * @short_description: Buffered Input Stream
38  * @include: gio/gio.h
39  * @see_also: #GFilterInputStream, #GInputStream
40  *
41  * Buffered input stream implements #GFilterInputStream and provides
42  * for buffered reads.
43  *
44  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
45  *
46  * To create a buffered input stream, use g_buffered_input_stream_new(),
47  * or g_buffered_input_stream_new_sized() to specify the buffer's size at
48  * construction.
49  *
50  * To get the size of a buffer within a buffered input stream, use
51  * g_buffered_input_stream_get_buffer_size(). To change the size of a
52  * buffered input stream's buffer, use
53  * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
54  * cannot be reduced below the size of the data within the buffer.
55  */
56
57
58 #define DEFAULT_BUFFER_SIZE 4096
59
60 struct _GBufferedInputStreamPrivate {
61   guint8 *buffer;
62   gsize   len;
63   gsize   pos;
64   gsize   end;
65   GAsyncReadyCallback outstanding_callback;
66 };
67
68 enum {
69   PROP_0,
70   PROP_BUFSIZE
71 };
72
73 static void g_buffered_input_stream_set_property  (GObject      *object,
74                                                    guint         prop_id,
75                                                    const GValue *value,
76                                                    GParamSpec   *pspec);
77
78 static void g_buffered_input_stream_get_property  (GObject      *object,
79                                                    guint         prop_id,
80                                                    GValue       *value,
81                                                    GParamSpec   *pspec);
82 static void g_buffered_input_stream_finalize      (GObject *object);
83
84
85 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
86                                                         gsize                  count,
87                                                         GCancellable          *cancellable,
88                                                         GError               **error);
89 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
90                                                         gsize                  count,
91                                                         int                    io_priority,
92                                                         GCancellable          *cancellable,
93                                                         GAsyncReadyCallback    callback,
94                                                         gpointer               user_data);
95 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
96                                                         GAsyncResult          *result,
97                                                         GError               **error);
98 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
99                                                         void                  *buffer,
100                                                         gsize                  count,
101                                                         GCancellable          *cancellable,
102                                                         GError               **error);
103 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
104                                                         void                  *buffer,
105                                                         gsize                  count,
106                                                         int                    io_priority,
107                                                         GCancellable          *cancellable,
108                                                         GAsyncReadyCallback    callback,
109                                                         gpointer               user_data);
110 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
111                                                         GAsyncResult          *result,
112                                                         GError               **error);
113 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
114                                                         gssize                 count,
115                                                         GCancellable          *cancellable,
116                                                         GError               **error);
117 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
118                                                         gssize                 count,
119                                                         int                    io_priority,
120                                                         GCancellable          *cancellable,
121                                                         GAsyncReadyCallback    callback,
122                                                         gpointer               user_data);
123 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
124                                                         GAsyncResult          *result,
125                                                         GError               **error);
126
127 static void compact_buffer (GBufferedInputStream *stream);
128
129 G_DEFINE_TYPE (GBufferedInputStream,
130                g_buffered_input_stream,
131                G_TYPE_FILTER_INPUT_STREAM)
132
133
134 static void
135 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
136 {
137   GObjectClass *object_class;
138   GInputStreamClass *istream_class;
139   GBufferedInputStreamClass *bstream_class;
140
141   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
142
143   object_class = G_OBJECT_CLASS (klass);
144   object_class->get_property = g_buffered_input_stream_get_property;
145   object_class->set_property = g_buffered_input_stream_set_property;
146   object_class->finalize     = g_buffered_input_stream_finalize;
147
148   istream_class = G_INPUT_STREAM_CLASS (klass);
149   istream_class->skip = g_buffered_input_stream_skip;
150   istream_class->skip_async  = g_buffered_input_stream_skip_async;
151   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
152   istream_class->read_fn = g_buffered_input_stream_read;
153   istream_class->read_async  = g_buffered_input_stream_read_async;
154   istream_class->read_finish = g_buffered_input_stream_read_finish;
155
156   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
157   bstream_class->fill = g_buffered_input_stream_real_fill;
158   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
159   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
160
161   g_object_class_install_property (object_class,
162                                    PROP_BUFSIZE,
163                                    g_param_spec_uint ("buffer-size",
164                                                       P_("Buffer Size"),
165                                                       P_("The size of the backend buffer"),
166                                                       1,
167                                                       G_MAXUINT,
168                                                       DEFAULT_BUFFER_SIZE,
169                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
170                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
171
172
173 }
174
175 /**
176  * g_buffered_input_stream_get_buffer_size:
177  * @stream: a #GBufferedInputStream
178  *
179  * Gets the size of the input buffer.
180  *
181  * Returns: the current buffer size.
182  */
183 gsize
184 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
185 {
186   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
187
188   return stream->priv->len;
189 }
190
191 /**
192  * g_buffered_input_stream_set_buffer_size:
193  * @stream: a #GBufferedInputStream
194  * @size: a #gsize
195  *
196  * Sets the size of the internal buffer of @stream to @size, or to the
197  * size of the contents of the buffer. The buffer can never be resized
198  * smaller than its current contents.
199  */
200 void
201 g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream,
202                                          gsize                 size)
203 {
204   GBufferedInputStreamPrivate *priv;
205   gsize in_buffer;
206   guint8 *buffer;
207
208   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
209
210   priv = stream->priv;
211
212   if (priv->len == size)
213     return;
214
215   if (priv->buffer)
216     {
217       in_buffer = priv->end - priv->pos;
218
219       /* Never resize smaller than current buffer contents */
220       size = MAX (size, in_buffer);
221
222       buffer = g_malloc (size);
223       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
224       priv->len = size;
225       priv->pos = 0;
226       priv->end = in_buffer;
227       g_free (priv->buffer);
228       priv->buffer = buffer;
229     }
230   else
231     {
232       priv->len = size;
233       priv->pos = 0;
234       priv->end = 0;
235       priv->buffer = g_malloc (size);
236     }
237
238   g_object_notify (G_OBJECT (stream), "buffer-size");
239 }
240
241 static void
242 g_buffered_input_stream_set_property (GObject      *object,
243                                       guint         prop_id,
244                                       const GValue *value,
245                                       GParamSpec   *pspec)
246 {
247   GBufferedInputStream        *bstream;
248
249   bstream = G_BUFFERED_INPUT_STREAM (object);
250
251   switch (prop_id)
252     {
253     case PROP_BUFSIZE:
254       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
255       break;
256
257     default:
258       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
259       break;
260     }
261 }
262
263 static void
264 g_buffered_input_stream_get_property (GObject    *object,
265                                       guint       prop_id,
266                                       GValue     *value,
267                                       GParamSpec *pspec)
268 {
269   GBufferedInputStreamPrivate *priv;
270   GBufferedInputStream        *bstream;
271
272   bstream = G_BUFFERED_INPUT_STREAM (object);
273   priv = bstream->priv;
274
275   switch (prop_id)
276     {
277     case PROP_BUFSIZE:
278       g_value_set_uint (value, priv->len);
279       break;
280
281     default:
282       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
283       break;
284     }
285 }
286
287 static void
288 g_buffered_input_stream_finalize (GObject *object)
289 {
290   GBufferedInputStreamPrivate *priv;
291   GBufferedInputStream        *stream;
292
293   stream = G_BUFFERED_INPUT_STREAM (object);
294   priv = stream->priv;
295
296   g_free (priv->buffer);
297
298   G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
299 }
300
301 static void
302 g_buffered_input_stream_init (GBufferedInputStream *stream)
303 {
304   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
305                                               G_TYPE_BUFFERED_INPUT_STREAM,
306                                               GBufferedInputStreamPrivate);
307 }
308
309
310 /**
311  * g_buffered_input_stream_new:
312  * @base_stream: a #GInputStream
313  *
314  * Creates a new #GInputStream from the given @base_stream, with
315  * a buffer set to the default size (4 kilobytes).
316  *
317  * Returns: a #GInputStream for the given @base_stream.
318  */
319 GInputStream *
320 g_buffered_input_stream_new (GInputStream *base_stream)
321 {
322   GInputStream *stream;
323
324   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
325
326   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
327                          "base-stream", base_stream,
328                          NULL);
329
330   return stream;
331 }
332
333 /**
334  * g_buffered_input_stream_new_sized:
335  * @base_stream: a #GInputStream
336  * @size: a #gsize
337  *
338  * Creates a new #GBufferedInputStream from the given @base_stream,
339  * with a buffer set to @size.
340  *
341  * Returns: a #GInputStream.
342  */
343 GInputStream *
344 g_buffered_input_stream_new_sized (GInputStream *base_stream,
345                                    gsize         size)
346 {
347   GInputStream *stream;
348
349   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
350
351   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
352                          "base-stream", base_stream,
353                          "buffer-size", (guint)size,
354                          NULL);
355
356   return stream;
357 }
358
359 /**
360  * g_buffered_input_stream_fill:
361  * @stream: a #GBufferedInputStream
362  * @count: the number of bytes that will be read from the stream
363  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore
364  * @error: location to store the error occurring, or %NULL to ignore
365  *
366  * Tries to read @count bytes from the stream into the buffer.
367  * Will block during this read.
368  *
369  * If @count is zero, returns zero and does nothing. A value of @count
370  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
371  *
372  * On success, the number of bytes read into the buffer is returned.
373  * It is not an error if this is not the same as the requested size, as it
374  * can happen e.g. near the end of a file. Zero is returned on end of file
375  * (or if @count is zero),  but never otherwise.
376  *
377  * If @count is -1 then the attempted read size is equal to the number of
378  * bytes that are required to fill the buffer.
379  *
380  * If @cancellable is not %NULL, then the operation can be cancelled by
381  * triggering the cancellable object from another thread. If the operation
382  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
383  * operation was partially finished when the operation was cancelled the
384  * partial result will be returned, without an error.
385  *
386  * On error -1 is returned and @error is set accordingly.
387  *
388  * For the asynchronous, non-blocking, version of this function, see
389  * g_buffered_input_stream_fill_async().
390  *
391  * Returns: the number of bytes read into @stream's buffer, up to @count,
392  *     or -1 on error.
393  */
394 gssize
395 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
396                               gssize                 count,
397                               GCancellable          *cancellable,
398                               GError               **error)
399 {
400   GBufferedInputStreamClass *class;
401   GInputStream *input_stream;
402   gssize res;
403
404   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
405
406   input_stream = G_INPUT_STREAM (stream);
407
408   if (count < -1)
409     {
410       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
411                    _("Too large count value passed to %s"), G_STRFUNC);
412       return -1;
413     }
414
415   if (!g_input_stream_set_pending (input_stream, error))
416     return -1;
417
418   if (cancellable)
419     g_cancellable_push_current (cancellable);
420
421   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
422   res = class->fill (stream, count, cancellable, error);
423
424   if (cancellable)
425     g_cancellable_pop_current (cancellable);
426
427   g_input_stream_clear_pending (input_stream);
428
429   return res;
430 }
431
432 static void
433 async_fill_callback_wrapper (GObject      *source_object,
434                              GAsyncResult *res,
435                              gpointer      user_data)
436 {
437   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
438
439   g_input_stream_clear_pending (G_INPUT_STREAM (stream));
440   (*stream->priv->outstanding_callback) (source_object, res, user_data);
441   g_object_unref (stream);
442 }
443
444 /**
445  * g_buffered_input_stream_fill_async:
446  * @stream: a #GBufferedInputStream
447  * @count: the number of bytes that will be read from the stream
448  * @io_priority: the <link linkend="io-priority">I/O priority</link>
449  *     of the request
450  * @cancellable: (allow-none): optional #GCancellable object
451  * @callback: (scope async): a #GAsyncReadyCallback
452  * @user_data: (closure): a #gpointer
453  *
454  * Reads data into @stream's buffer asynchronously, up to @count size.
455  * @io_priority can be used to prioritize reads. For the synchronous
456  * version of this function, see g_buffered_input_stream_fill().
457  *
458  * If @count is -1 then the attempted read size is equal to the number
459  * of bytes that are required to fill the buffer.
460  */
461 void
462 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
463                                     gssize                count,
464                                     int                   io_priority,
465                                     GCancellable         *cancellable,
466                                     GAsyncReadyCallback   callback,
467                                     gpointer              user_data)
468 {
469   GBufferedInputStreamClass *class;
470   GSimpleAsyncResult *simple;
471   GError *error = NULL;
472
473   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
474
475   if (count == 0)
476     {
477       simple = g_simple_async_result_new (G_OBJECT (stream),
478                                           callback,
479                                           user_data,
480                                           g_buffered_input_stream_fill_async);
481       g_simple_async_result_complete_in_idle (simple);
482       g_object_unref (simple);
483       return;
484     }
485
486   if (count < -1)
487     {
488       g_simple_async_report_error_in_idle (G_OBJECT (stream),
489                                            callback,
490                                            user_data,
491                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
492                                            _("Too large count value passed to %s"),
493                                            G_STRFUNC);
494       return;
495     }
496
497   if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
498     {
499       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
500                                             callback,
501                                             user_data,
502                                             error);
503       return;
504     }
505
506   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
507
508   stream->priv->outstanding_callback = callback;
509   g_object_ref (stream);
510   class->fill_async (stream, count, io_priority, cancellable,
511                      async_fill_callback_wrapper, user_data);
512 }
513
514 /**
515  * g_buffered_input_stream_fill_finish:
516  * @stream: a #GBufferedInputStream
517  * @result: a #GAsyncResult
518  * @error: a #GError
519  *
520  * Finishes an asynchronous read.
521  *
522  * Returns: a #gssize of the read stream, or %-1 on an error.
523  */
524 gssize
525 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
526                                      GAsyncResult          *result,
527                                      GError               **error)
528 {
529   GSimpleAsyncResult *simple;
530   GBufferedInputStreamClass *class;
531
532   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
533   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
534
535   if (G_IS_SIMPLE_ASYNC_RESULT (result))
536     {
537       simple = G_SIMPLE_ASYNC_RESULT (result);
538       if (g_simple_async_result_propagate_error (simple, error))
539         return -1;
540
541       /* Special case read of 0 bytes */
542       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
543         return 0;
544     }
545
546   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
547   return class->fill_finish (stream, result, error);
548 }
549
550 /**
551  * g_buffered_input_stream_get_available:
552  * @stream: #GBufferedInputStream
553  *
554  * Gets the size of the available data within the stream.
555  *
556  * Returns: size of the available stream.
557  */
558 gsize
559 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
560 {
561   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
562
563   return stream->priv->end - stream->priv->pos;
564 }
565
566 /**
567  * g_buffered_input_stream_peek:
568  * @stream: a #GBufferedInputStream
569  * @buffer: (array length=count) (element-type guint8): a pointer to
570  *   an allocated chunk of memory
571  * @offset: a #gsize
572  * @count: a #gsize
573  *
574  * Peeks in the buffer, copying data of size @count into @buffer,
575  * offset @offset bytes.
576  *
577  * Returns: a #gsize of the number of bytes peeked, or -1 on error.
578  */
579 gsize
580 g_buffered_input_stream_peek (GBufferedInputStream *stream,
581                               void                 *buffer,
582                               gsize                 offset,
583                               gsize                 count)
584 {
585   gsize available;
586   gsize end;
587
588   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
589   g_return_val_if_fail (buffer != NULL, -1);
590
591   available = g_buffered_input_stream_get_available (stream);
592
593   if (offset > available)
594     return 0;
595
596   end = MIN (offset + count, available);
597   count = end - offset;
598
599   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
600   return count;
601 }
602
603 /**
604  * g_buffered_input_stream_peek_buffer:
605  * @stream: a #GBufferedInputStream
606  * @count: (out): a #gsize to get the number of bytes available in the buffer
607  *
608  * Returns the buffer with the currently available bytes. The returned
609  * buffer must not be modified and will become invalid when reading from
610  * the stream or filling the buffer.
611  *
612  * Returns: (array length=count) (element-type guint8) (transfer none):
613  *          read-only buffer
614  */
615 const void*
616 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
617                                      gsize                *count)
618 {
619   GBufferedInputStreamPrivate *priv;
620
621   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
622
623   priv = stream->priv;
624
625   if (count)
626     *count = priv->end - priv->pos;
627
628   return priv->buffer + priv->pos;
629 }
630
631 static void
632 compact_buffer (GBufferedInputStream *stream)
633 {
634   GBufferedInputStreamPrivate *priv;
635   gsize current_size;
636
637   priv = stream->priv;
638
639   current_size = priv->end - priv->pos;
640
641   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
642
643   priv->pos = 0;
644   priv->end = current_size;
645 }
646
647 static gssize
648 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
649                                    gssize                 count,
650                                    GCancellable          *cancellable,
651                                    GError               **error)
652 {
653   GBufferedInputStreamPrivate *priv;
654   GInputStream *base_stream;
655   gssize nread;
656   gsize in_buffer;
657
658   priv = stream->priv;
659
660   if (count == -1)
661     count = priv->len;
662
663   in_buffer = priv->end - priv->pos;
664
665   /* Never fill more than can fit in the buffer */
666   count = MIN (count, priv->len - in_buffer);
667
668   /* If requested length does not fit at end, compact */
669   if (priv->len - priv->end < count)
670     compact_buffer (stream);
671
672   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
673   nread = g_input_stream_read (base_stream,
674                                priv->buffer + priv->end,
675                                count,
676                                cancellable,
677                                error);
678
679   if (nread > 0)
680     priv->end += nread;
681
682   return nread;
683 }
684
685 static gssize
686 g_buffered_input_stream_skip (GInputStream  *stream,
687                               gsize          count,
688                               GCancellable  *cancellable,
689                               GError       **error)
690 {
691   GBufferedInputStream        *bstream;
692   GBufferedInputStreamPrivate *priv;
693   GBufferedInputStreamClass *class;
694   GInputStream *base_stream;
695   gsize available, bytes_skipped;
696   gssize nread;
697
698   bstream = G_BUFFERED_INPUT_STREAM (stream);
699   priv = bstream->priv;
700
701   available = priv->end - priv->pos;
702
703   if (count <= available)
704     {
705       priv->pos += count;
706       return count;
707     }
708
709   /* Full request not available, skip all currently available and
710    * request refill for more
711    */
712
713   priv->pos = 0;
714   priv->end = 0;
715   bytes_skipped = available;
716   count -= available;
717
718   if (bytes_skipped > 0)
719     error = NULL; /* Ignore further errors if we already read some data */
720
721   if (count > priv->len)
722     {
723       /* Large request, shortcut buffer */
724
725       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
726
727       nread = g_input_stream_skip (base_stream,
728                                    count,
729                                    cancellable,
730                                    error);
731
732       if (nread < 0 && bytes_skipped == 0)
733         return -1;
734
735       if (nread > 0)
736         bytes_skipped += nread;
737
738       return bytes_skipped;
739     }
740
741   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
742   nread = class->fill (bstream, priv->len, cancellable, error);
743
744   if (nread < 0)
745     {
746       if (bytes_skipped == 0)
747         return -1;
748       else
749         return bytes_skipped;
750     }
751
752   available = priv->end - priv->pos;
753   count = MIN (count, available);
754
755   bytes_skipped += count;
756   priv->pos += count;
757
758   return bytes_skipped;
759 }
760
761 static gssize
762 g_buffered_input_stream_read (GInputStream *stream,
763                               void         *buffer,
764                               gsize         count,
765                               GCancellable *cancellable,
766                               GError      **error)
767 {
768   GBufferedInputStream        *bstream;
769   GBufferedInputStreamPrivate *priv;
770   GBufferedInputStreamClass *class;
771   GInputStream *base_stream;
772   gsize available, bytes_read;
773   gssize nread;
774
775   bstream = G_BUFFERED_INPUT_STREAM (stream);
776   priv = bstream->priv;
777
778   available = priv->end - priv->pos;
779
780   if (count <= available)
781     {
782       memcpy (buffer, priv->buffer + priv->pos, count);
783       priv->pos += count;
784       return count;
785     }
786
787   /* Full request not available, read all currently available and
788    * request refill for more
789    */
790
791   memcpy (buffer, priv->buffer + priv->pos, available);
792   priv->pos = 0;
793   priv->end = 0;
794   bytes_read = available;
795   count -= available;
796
797   if (bytes_read > 0)
798     error = NULL; /* Ignore further errors if we already read some data */
799
800   if (count > priv->len)
801     {
802       /* Large request, shortcut buffer */
803
804       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
805
806       nread = g_input_stream_read (base_stream,
807                                    (char *)buffer + bytes_read,
808                                    count,
809                                    cancellable,
810                                    error);
811
812       if (nread < 0 && bytes_read == 0)
813         return -1;
814
815       if (nread > 0)
816         bytes_read += nread;
817
818       return bytes_read;
819     }
820
821   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
822   nread = class->fill (bstream, priv->len, cancellable, error);
823   if (nread < 0)
824     {
825       if (bytes_read == 0)
826         return -1;
827       else
828         return bytes_read;
829     }
830
831   available = priv->end - priv->pos;
832   count = MIN (count, available);
833
834   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
835   bytes_read += count;
836   priv->pos += count;
837
838   return bytes_read;
839 }
840
841 /**
842  * g_buffered_input_stream_read_byte:
843  * @stream: a #GBufferedInputStream
844  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore
845  * @error: location to store the error occurring, or %NULL to ignore
846  *
847  * Tries to read a single byte from the stream or the buffer. Will block
848  * during this read.
849  *
850  * On success, the byte read from the stream is returned. On end of stream
851  * -1 is returned but it's not an exceptional error and @error is not set.
852  *
853  * If @cancellable is not %NULL, then the operation can be cancelled by
854  * triggering the cancellable object from another thread. If the operation
855  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
856  * operation was partially finished when the operation was cancelled the
857  * partial result will be returned, without an error.
858  *
859  * On error -1 is returned and @error is set accordingly.
860  *
861  * Returns: the byte read from the @stream, or -1 on end of stream or error.
862  */
863 int
864 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
865                                    GCancellable          *cancellable,
866                                    GError               **error)
867 {
868   GBufferedInputStreamPrivate *priv;
869   GBufferedInputStreamClass *class;
870   GInputStream *input_stream;
871   gsize available;
872   gssize nread;
873
874   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
875
876   priv = stream->priv;
877   input_stream = G_INPUT_STREAM (stream);
878
879   if (g_input_stream_is_closed (input_stream))
880     {
881       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
882                            _("Stream is already closed"));
883       return -1;
884     }
885
886   if (!g_input_stream_set_pending (input_stream, error))
887     return -1;
888
889   available = priv->end - priv->pos;
890
891   if (available != 0)
892     {
893       g_input_stream_clear_pending (input_stream);
894       return priv->buffer[priv->pos++];
895     }
896
897   /* Byte not available, request refill for more */
898
899   if (cancellable)
900     g_cancellable_push_current (cancellable);
901
902   priv->pos = 0;
903   priv->end = 0;
904
905   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
906   nread = class->fill (stream, priv->len, cancellable, error);
907
908   if (cancellable)
909     g_cancellable_pop_current (cancellable);
910
911   g_input_stream_clear_pending (input_stream);
912
913   if (nread <= 0)
914     return -1; /* error or end of stream */
915
916   return priv->buffer[priv->pos++];
917 }
918
919 /* ************************** */
920 /* Async stuff implementation */
921 /* ************************** */
922
923 static void
924 fill_async_callback (GObject      *source_object,
925                      GAsyncResult *result,
926                      gpointer      user_data)
927 {
928   GError *error;
929   gssize res;
930   GSimpleAsyncResult *simple;
931
932   simple = user_data;
933
934   error = NULL;
935   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
936                                     result, &error);
937
938   g_simple_async_result_set_op_res_gssize (simple, res);
939   if (res == -1)
940     {
941       g_simple_async_result_take_error (simple, error);
942     }
943   else
944     {
945       GBufferedInputStreamPrivate *priv;
946       GObject *object;
947
948       object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
949       priv = G_BUFFERED_INPUT_STREAM (object)->priv;
950
951       g_assert_cmpint (priv->end + res, <=, priv->len);
952       priv->end += res;
953
954       g_object_unref (object);
955     }
956
957   /* Complete immediately, not in idle, since we're already
958    * in a mainloop callout
959    */
960   g_simple_async_result_complete (simple);
961   g_object_unref (simple);
962 }
963
964 static void
965 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
966                                          gssize                count,
967                                          int                   io_priority,
968                                          GCancellable         *cancellable,
969                                          GAsyncReadyCallback   callback,
970                                          gpointer              user_data)
971 {
972   GBufferedInputStreamPrivate *priv;
973   GInputStream *base_stream;
974   GSimpleAsyncResult *simple;
975   gsize in_buffer;
976
977   priv = stream->priv;
978
979   if (count == -1)
980     count = priv->len;
981
982   in_buffer = priv->end - priv->pos;
983
984   /* Never fill more than can fit in the buffer */
985   count = MIN (count, priv->len - in_buffer);
986
987   /* If requested length does not fit at end, compact */
988   if (priv->len - priv->end < count)
989     compact_buffer (stream);
990
991   simple = g_simple_async_result_new (G_OBJECT (stream),
992                                       callback, user_data,
993                                       g_buffered_input_stream_real_fill_async);
994
995   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
996   g_input_stream_read_async (base_stream,
997                              priv->buffer + priv->end,
998                              count,
999                              io_priority,
1000                              cancellable,
1001                              fill_async_callback,
1002                              simple);
1003 }
1004
1005 static gssize
1006 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
1007                                           GAsyncResult         *result,
1008                                           GError              **error)
1009 {
1010   GSimpleAsyncResult *simple;
1011   gssize nread;
1012
1013   simple = G_SIMPLE_ASYNC_RESULT (result);
1014   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1015
1016   nread = g_simple_async_result_get_op_res_gssize (simple);
1017   return nread;
1018 }
1019
1020 typedef struct
1021 {
1022   gssize bytes_read;
1023   gssize count;
1024   void *buffer;
1025 } ReadAsyncData;
1026
1027 static void
1028 free_read_async_data (gpointer _data)
1029 {
1030   ReadAsyncData *data = _data;
1031   g_slice_free (ReadAsyncData, data);
1032 }
1033
1034 static void
1035 large_read_callback (GObject *source_object,
1036                      GAsyncResult *result,
1037                      gpointer user_data)
1038 {
1039   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1040   ReadAsyncData *data;
1041   GError *error;
1042   gssize nread;
1043
1044   data = g_simple_async_result_get_op_res_gpointer (simple);
1045
1046   error = NULL;
1047   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1048                                       result, &error);
1049
1050   /* Only report the error if we've not already read some data */
1051   if (nread < 0 && data->bytes_read == 0)
1052     g_simple_async_result_take_error (simple, error);
1053   else if (error)
1054     g_error_free (error);
1055
1056   if (nread > 0)
1057     data->bytes_read += nread;
1058
1059   /* Complete immediately, not in idle, since we're already
1060    * in a mainloop callout
1061    */
1062   g_simple_async_result_complete (simple);
1063   g_object_unref (simple);
1064 }
1065
1066 static void
1067 read_fill_buffer_callback (GObject      *source_object,
1068                            GAsyncResult *result,
1069                            gpointer      user_data)
1070 {
1071   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1072   GBufferedInputStream *bstream;
1073   GBufferedInputStreamPrivate *priv;
1074   ReadAsyncData *data;
1075   GError *error;
1076   gssize nread;
1077   gsize available;
1078
1079   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1080   priv = bstream->priv;
1081
1082   data = g_simple_async_result_get_op_res_gpointer (simple);
1083
1084   error = NULL;
1085   nread = g_buffered_input_stream_fill_finish (bstream,
1086                                                result, &error);
1087
1088   if (nread < 0 && data->bytes_read == 0)
1089     g_simple_async_result_take_error (simple, error);
1090   else if (error)
1091     g_error_free (error);
1092
1093   if (nread > 0)
1094     {
1095       available = priv->end - priv->pos;
1096       data->count = MIN (data->count, available);
1097
1098       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1099       data->bytes_read += data->count;
1100       priv->pos += data->count;
1101     }
1102
1103   /* Complete immediately, not in idle, since we're already
1104    * in a mainloop callout
1105    */
1106   g_simple_async_result_complete (simple);
1107   g_object_unref (simple);
1108 }
1109
1110 static void
1111 g_buffered_input_stream_read_async (GInputStream        *stream,
1112                                     void                *buffer,
1113                                     gsize                count,
1114                                     int                  io_priority,
1115                                     GCancellable        *cancellable,
1116                                     GAsyncReadyCallback  callback,
1117                                     gpointer             user_data)
1118 {
1119   GBufferedInputStream *bstream;
1120   GBufferedInputStreamPrivate *priv;
1121   GBufferedInputStreamClass *class;
1122   GInputStream *base_stream;
1123   gsize available;
1124   GSimpleAsyncResult *simple;
1125   ReadAsyncData *data;
1126
1127   bstream = G_BUFFERED_INPUT_STREAM (stream);
1128   priv = bstream->priv;
1129
1130   data = g_slice_new (ReadAsyncData);
1131   data->buffer = buffer;
1132   data->bytes_read = 0;
1133   simple = g_simple_async_result_new (G_OBJECT (stream),
1134                                       callback, user_data,
1135                                       g_buffered_input_stream_read_async);
1136   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1137
1138   available = priv->end - priv->pos;
1139
1140   if (count <= available)
1141     {
1142       memcpy (buffer, priv->buffer + priv->pos, count);
1143       priv->pos += count;
1144       data->bytes_read = count;
1145
1146       g_simple_async_result_complete_in_idle (simple);
1147       g_object_unref (simple);
1148       return;
1149     }
1150
1151
1152   /* Full request not available, read all currently available
1153    * and request refill for more
1154    */
1155
1156   memcpy (buffer, priv->buffer + priv->pos, available);
1157   priv->pos = 0;
1158   priv->end = 0;
1159
1160   count -= available;
1161
1162   data->bytes_read = available;
1163   data->count = count;
1164
1165   if (count > priv->len)
1166     {
1167       /* Large request, shortcut buffer */
1168
1169       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1170
1171       g_input_stream_read_async (base_stream,
1172                                  (char *)buffer + data->bytes_read,
1173                                  count,
1174                                  io_priority, cancellable,
1175                                  large_read_callback,
1176                                  simple);
1177     }
1178   else
1179     {
1180       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1181       class->fill_async (bstream, priv->len, io_priority, cancellable,
1182                          read_fill_buffer_callback, simple);
1183     }
1184 }
1185
1186 static gssize
1187 g_buffered_input_stream_read_finish (GInputStream   *stream,
1188                                      GAsyncResult   *result,
1189                                      GError        **error)
1190 {
1191   GSimpleAsyncResult *simple;
1192   ReadAsyncData *data;
1193
1194   simple = G_SIMPLE_ASYNC_RESULT (result);
1195
1196   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1197
1198   data = g_simple_async_result_get_op_res_gpointer (simple);
1199
1200   return data->bytes_read;
1201 }
1202
1203 typedef struct
1204 {
1205   gssize bytes_skipped;
1206   gssize count;
1207 } SkipAsyncData;
1208
1209 static void
1210 free_skip_async_data (gpointer _data)
1211 {
1212   SkipAsyncData *data = _data;
1213   g_slice_free (SkipAsyncData, data);
1214 }
1215
1216 static void
1217 large_skip_callback (GObject      *source_object,
1218                      GAsyncResult *result,
1219                      gpointer      user_data)
1220 {
1221   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1222   SkipAsyncData *data;
1223   GError *error;
1224   gssize nread;
1225
1226   data = g_simple_async_result_get_op_res_gpointer (simple);
1227
1228   error = NULL;
1229   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1230                                       result, &error);
1231
1232   /* Only report the error if we've not already read some data */
1233   if (nread < 0 && data->bytes_skipped == 0)
1234     g_simple_async_result_take_error (simple, error);
1235   else if (error)
1236     g_error_free (error);
1237
1238   if (nread > 0)
1239     data->bytes_skipped += nread;
1240
1241   /* Complete immediately, not in idle, since we're already
1242    * in a mainloop callout
1243    */
1244   g_simple_async_result_complete (simple);
1245   g_object_unref (simple);
1246 }
1247
1248 static void
1249 skip_fill_buffer_callback (GObject      *source_object,
1250                            GAsyncResult *result,
1251                            gpointer      user_data)
1252 {
1253   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1254   GBufferedInputStream *bstream;
1255   GBufferedInputStreamPrivate *priv;
1256   SkipAsyncData *data;
1257   GError *error;
1258   gssize nread;
1259   gsize available;
1260
1261   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1262   priv = bstream->priv;
1263
1264   data = g_simple_async_result_get_op_res_gpointer (simple);
1265
1266   error = NULL;
1267   nread = g_buffered_input_stream_fill_finish (bstream,
1268                                                result, &error);
1269
1270   if (nread < 0 && data->bytes_skipped == 0)
1271     g_simple_async_result_take_error (simple, error);
1272   else if (error)
1273     g_error_free (error);
1274
1275   if (nread > 0)
1276     {
1277       available = priv->end - priv->pos;
1278       data->count = MIN (data->count, available);
1279
1280       data->bytes_skipped += data->count;
1281       priv->pos += data->count;
1282     }
1283
1284   /* Complete immediately, not in idle, since we're already
1285    * in a mainloop callout
1286    */
1287   g_simple_async_result_complete (simple);
1288   g_object_unref (simple);
1289 }
1290
1291 static void
1292 g_buffered_input_stream_skip_async (GInputStream        *stream,
1293                                     gsize                count,
1294                                     int                  io_priority,
1295                                     GCancellable        *cancellable,
1296                                     GAsyncReadyCallback  callback,
1297                                     gpointer             user_data)
1298 {
1299   GBufferedInputStream *bstream;
1300   GBufferedInputStreamPrivate *priv;
1301   GBufferedInputStreamClass *class;
1302   GInputStream *base_stream;
1303   gsize available;
1304   GSimpleAsyncResult *simple;
1305   SkipAsyncData *data;
1306
1307   bstream = G_BUFFERED_INPUT_STREAM (stream);
1308   priv = bstream->priv;
1309
1310   data = g_slice_new (SkipAsyncData);
1311   data->bytes_skipped = 0;
1312   simple = g_simple_async_result_new (G_OBJECT (stream),
1313                                       callback, user_data,
1314                                       g_buffered_input_stream_skip_async);
1315   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1316
1317   available = priv->end - priv->pos;
1318
1319   if (count <= available)
1320     {
1321       priv->pos += count;
1322       data->bytes_skipped = count;
1323
1324       g_simple_async_result_complete_in_idle (simple);
1325       g_object_unref (simple);
1326       return;
1327     }
1328
1329   /* Full request not available, skip all currently available
1330    * and request refill for more
1331    */
1332
1333   priv->pos = 0;
1334   priv->end = 0;
1335
1336   count -= available;
1337
1338   data->bytes_skipped = available;
1339   data->count = count;
1340
1341   if (count > priv->len)
1342     {
1343       /* Large request, shortcut buffer */
1344
1345       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1346
1347       g_input_stream_skip_async (base_stream,
1348                                  count,
1349                                  io_priority, cancellable,
1350                                  large_skip_callback,
1351                                  simple);
1352     }
1353   else
1354     {
1355       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1356       class->fill_async (bstream, priv->len, io_priority, cancellable,
1357                          skip_fill_buffer_callback, simple);
1358     }
1359 }
1360
1361 static gssize
1362 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1363                                      GAsyncResult   *result,
1364                                      GError        **error)
1365 {
1366   GSimpleAsyncResult *simple;
1367   SkipAsyncData *data;
1368
1369   simple = G_SIMPLE_ASYNC_RESULT (result);
1370
1371   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1372
1373   data = g_simple_async_result_get_op_res_gpointer (simple);
1374
1375   return data->bytes_skipped;
1376 }