f0f91249476928a85630ab480b07404ae0a70589
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org>
22  */
23
24 #include "config.h"
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gcancellable.h"
28 #include "gasyncresult.h"
29 #include "gsimpleasyncresult.h"
30 #include "gioerror.h"
31 #include <string.h>
32 #include "glibintl.h"
33
34
35 /**
36  * SECTION:gbufferedinputstream
37  * @short_description: Buffered Input Stream
38  * @include: gio/gio.h
39  * @see_also: #GFilterInputStream, #GInputStream
40  *
41  * Buffered input stream implements #GFilterInputStream and provides
42  * for buffered reads.
43  *
44  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
45  *
46  * To create a buffered input stream, use g_buffered_input_stream_new(),
47  * or g_buffered_input_stream_new_sized() to specify the buffer's size at
48  * construction.
49  *
50  * To get the size of a buffer within a buffered input stream, use
51  * g_buffered_input_stream_get_buffer_size(). To change the size of a
52  * buffered input stream's buffer, use
53  * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
54  * cannot be reduced below the size of the data within the buffer.
55  */
56
57
58 #define DEFAULT_BUFFER_SIZE 4096
59
60 struct _GBufferedInputStreamPrivate {
61   guint8 *buffer;
62   gsize   len;
63   gsize   pos;
64   gsize   end;
65   GAsyncReadyCallback outstanding_callback;
66 };
67
68 enum {
69   PROP_0,
70   PROP_BUFSIZE
71 };
72
73 static void g_buffered_input_stream_set_property  (GObject      *object,
74                                                    guint         prop_id,
75                                                    const GValue *value,
76                                                    GParamSpec   *pspec);
77
78 static void g_buffered_input_stream_get_property  (GObject      *object,
79                                                    guint         prop_id,
80                                                    GValue       *value,
81                                                    GParamSpec   *pspec);
82 static void g_buffered_input_stream_finalize      (GObject *object);
83
84
85 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
86                                                         gsize                  count,
87                                                         GCancellable          *cancellable,
88                                                         GError               **error);
89 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
90                                                         gsize                  count,
91                                                         int                    io_priority,
92                                                         GCancellable          *cancellable,
93                                                         GAsyncReadyCallback    callback,
94                                                         gpointer               user_data);
95 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
96                                                         GAsyncResult          *result,
97                                                         GError               **error);
98 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
99                                                         void                  *buffer,
100                                                         gsize                  count,
101                                                         GCancellable          *cancellable,
102                                                         GError               **error);
103 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
104                                                         void                  *buffer,
105                                                         gsize                  count,
106                                                         int                    io_priority,
107                                                         GCancellable          *cancellable,
108                                                         GAsyncReadyCallback    callback,
109                                                         gpointer               user_data);
110 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
111                                                         GAsyncResult          *result,
112                                                         GError               **error);
113 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
114                                                         gssize                 count,
115                                                         GCancellable          *cancellable,
116                                                         GError               **error);
117 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
118                                                         gssize                 count,
119                                                         int                    io_priority,
120                                                         GCancellable          *cancellable,
121                                                         GAsyncReadyCallback    callback,
122                                                         gpointer               user_data);
123 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
124                                                         GAsyncResult          *result,
125                                                         GError               **error);
126
127 static void compact_buffer (GBufferedInputStream *stream);
128
129 G_DEFINE_TYPE (GBufferedInputStream,
130                g_buffered_input_stream,
131                G_TYPE_FILTER_INPUT_STREAM)
132
133
134 static void
135 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
136 {
137   GObjectClass *object_class;
138   GInputStreamClass *istream_class;
139   GBufferedInputStreamClass *bstream_class;
140
141   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
142
143   object_class = G_OBJECT_CLASS (klass);
144   object_class->get_property = g_buffered_input_stream_get_property;
145   object_class->set_property = g_buffered_input_stream_set_property;
146   object_class->finalize     = g_buffered_input_stream_finalize;
147
148   istream_class = G_INPUT_STREAM_CLASS (klass);
149   istream_class->skip = g_buffered_input_stream_skip;
150   istream_class->skip_async  = g_buffered_input_stream_skip_async;
151   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
152   istream_class->read_fn = g_buffered_input_stream_read;
153   istream_class->read_async  = g_buffered_input_stream_read_async;
154   istream_class->read_finish = g_buffered_input_stream_read_finish;
155
156   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
157   bstream_class->fill = g_buffered_input_stream_real_fill;
158   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
159   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
160
161   g_object_class_install_property (object_class,
162                                    PROP_BUFSIZE,
163                                    g_param_spec_uint ("buffer-size",
164                                                       P_("Buffer Size"),
165                                                       P_("The size of the backend buffer"),
166                                                       1,
167                                                       G_MAXUINT,
168                                                       DEFAULT_BUFFER_SIZE,
169                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
170                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
171
172
173 }
174
175 /**
176  * g_buffered_input_stream_get_buffer_size:
177  * @stream: a #GBufferedInputStream
178  *
179  * Gets the size of the input buffer.
180  *
181  * Returns: the current buffer size.
182  */
183 gsize
184 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
185 {
186   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
187
188   return stream->priv->len;
189 }
190
191 /**
192  * g_buffered_input_stream_set_buffer_size:
193  * @stream: a #GBufferedInputStream
194  * @size: a #gsize
195  *
196  * Sets the size of the internal buffer of @stream to @size, or to the
197  * size of the contents of the buffer. The buffer can never be resized
198  * smaller than its current contents.
199  */
200 void
201 g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream,
202                                          gsize                 size)
203 {
204   GBufferedInputStreamPrivate *priv;
205   gsize in_buffer;
206   guint8 *buffer;
207
208   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
209
210   priv = stream->priv;
211
212   if (priv->len == size)
213     return;
214
215   if (priv->buffer)
216     {
217       in_buffer = priv->end - priv->pos;
218
219       /* Never resize smaller than current buffer contents */
220       size = MAX (size, in_buffer);
221
222       buffer = g_malloc (size);
223       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
224       priv->len = size;
225       priv->pos = 0;
226       priv->end = in_buffer;
227       g_free (priv->buffer);
228       priv->buffer = buffer;
229     }
230   else
231     {
232       priv->len = size;
233       priv->pos = 0;
234       priv->end = 0;
235       priv->buffer = g_malloc (size);
236     }
237
238   g_object_notify (G_OBJECT (stream), "buffer-size");
239 }
240
241 static void
242 g_buffered_input_stream_set_property (GObject      *object,
243                                       guint         prop_id,
244                                       const GValue *value,
245                                       GParamSpec   *pspec)
246 {
247   GBufferedInputStream        *bstream;
248
249   bstream = G_BUFFERED_INPUT_STREAM (object);
250
251   switch (prop_id)
252     {
253     case PROP_BUFSIZE:
254       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
255       break;
256
257     default:
258       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
259       break;
260     }
261 }
262
263 static void
264 g_buffered_input_stream_get_property (GObject    *object,
265                                       guint       prop_id,
266                                       GValue     *value,
267                                       GParamSpec *pspec)
268 {
269   GBufferedInputStreamPrivate *priv;
270   GBufferedInputStream        *bstream;
271
272   bstream = G_BUFFERED_INPUT_STREAM (object);
273   priv = bstream->priv;
274
275   switch (prop_id)
276     {
277     case PROP_BUFSIZE:
278       g_value_set_uint (value, priv->len);
279       break;
280
281     default:
282       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
283       break;
284     }
285 }
286
287 static void
288 g_buffered_input_stream_finalize (GObject *object)
289 {
290   GBufferedInputStreamPrivate *priv;
291   GBufferedInputStream        *stream;
292
293   stream = G_BUFFERED_INPUT_STREAM (object);
294   priv = stream->priv;
295
296   g_free (priv->buffer);
297
298   G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
299 }
300
301 static void
302 g_buffered_input_stream_init (GBufferedInputStream *stream)
303 {
304   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
305                                               G_TYPE_BUFFERED_INPUT_STREAM,
306                                               GBufferedInputStreamPrivate);
307 }
308
309
310 /**
311  * g_buffered_input_stream_new:
312  * @base_stream: a #GInputStream
313  *
314  * Creates a new #GInputStream from the given @base_stream, with
315  * a buffer set to the default size (4 kilobytes).
316  *
317  * Returns: a #GInputStream for the given @base_stream.
318  */
319 GInputStream *
320 g_buffered_input_stream_new (GInputStream *base_stream)
321 {
322   GInputStream *stream;
323
324   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
325
326   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
327                          "base-stream", base_stream,
328                          NULL);
329
330   return stream;
331 }
332
333 /**
334  * g_buffered_input_stream_new_sized:
335  * @base_stream: a #GInputStream
336  * @size: a #gsize
337  *
338  * Creates a new #GBufferedInputStream from the given @base_stream,
339  * with a buffer set to @size.
340  *
341  * Returns: a #GInputStream.
342  */
343 GInputStream *
344 g_buffered_input_stream_new_sized (GInputStream *base_stream,
345                                    gsize         size)
346 {
347   GInputStream *stream;
348
349   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
350
351   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
352                          "base-stream", base_stream,
353                          "buffer-size", (guint)size,
354                          NULL);
355
356   return stream;
357 }
358
359 /**
360  * g_buffered_input_stream_fill:
361  * @stream: a #GBufferedInputStream
362  * @count: the number of bytes that will be read from the stream
363  * @cancellable: optional #GCancellable object, %NULL to ignore
364  * @error: location to store the error occuring, or %NULL to ignore
365  *
366  * Tries to read @count bytes from the stream into the buffer.
367  * Will block during this read.
368  *
369  * If @count is zero, returns zero and does nothing. A value of @count
370  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
371  *
372  * On success, the number of bytes read into the buffer is returned.
373  * It is not an error if this is not the same as the requested size, as it
374  * can happen e.g. near the end of a file. Zero is returned on end of file
375  * (or if @count is zero),  but never otherwise.
376  *
377  * If @count is -1 then the attempted read size is equal to the number of
378  * bytes that are required to fill the buffer.
379  *
380  * If @cancellable is not %NULL, then the operation can be cancelled by
381  * triggering the cancellable object from another thread. If the operation
382  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
383  * operation was partially finished when the operation was cancelled the
384  * partial result will be returned, without an error.
385  *
386  * On error -1 is returned and @error is set accordingly.
387  *
388  * For the asynchronous, non-blocking, version of this function, see
389  * g_buffered_input_stream_fill_async().
390  *
391  * Returns: the number of bytes read into @stream's buffer, up to @count,
392  *     or -1 on error.
393  */
394 gssize
395 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
396                               gssize                 count,
397                               GCancellable          *cancellable,
398                               GError               **error)
399 {
400   GBufferedInputStreamClass *class;
401   GInputStream *input_stream;
402   gssize res;
403
404   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
405
406   input_stream = G_INPUT_STREAM (stream);
407
408   if (count < -1)
409     {
410       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
411                    _("Too large count value passed to %s"), G_STRFUNC);
412       return -1;
413     }
414
415   if (!g_input_stream_set_pending (input_stream, error))
416     return -1;
417
418   if (cancellable)
419     g_cancellable_push_current (cancellable);
420
421   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
422   res = class->fill (stream, count, cancellable, error);
423
424   if (cancellable)
425     g_cancellable_pop_current (cancellable);
426
427   g_input_stream_clear_pending (input_stream);
428
429   return res;
430 }
431
432 static void
433 async_fill_callback_wrapper (GObject      *source_object,
434                              GAsyncResult *res,
435                              gpointer      user_data)
436 {
437   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
438
439   g_input_stream_clear_pending (G_INPUT_STREAM (stream));
440   (*stream->priv->outstanding_callback) (source_object, res, user_data);
441   g_object_unref (stream);
442 }
443
444 /**
445  * g_buffered_input_stream_fill_async:
446  * @stream: a #GBufferedInputStream
447  * @count: the number of bytes that will be read from the stream
448  * @io_priority: the <link linkend="io-priority">I/O priority</link>
449  *     of the request
450  * @cancellable: optional #GCancellable object
451  * @callback: a #GAsyncReadyCallback
452  * @user_data: a #gpointer
453  *
454  * Reads data into @stream's buffer asynchronously, up to @count size.
455  * @io_priority can be used to prioritize reads. For the synchronous
456  * version of this function, see g_buffered_input_stream_fill().
457  *
458  * If @count is -1 then the attempted read size is equal to the number
459  * of bytes that are required to fill the buffer.
460  */
461 void
462 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
463                                     gssize                count,
464                                     int                   io_priority,
465                                     GCancellable         *cancellable,
466                                     GAsyncReadyCallback   callback,
467                                     gpointer              user_data)
468 {
469   GBufferedInputStreamClass *class;
470   GSimpleAsyncResult *simple;
471   GError *error = NULL;
472
473   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
474
475   if (count == 0)
476     {
477       simple = g_simple_async_result_new (G_OBJECT (stream),
478                                           callback,
479                                           user_data,
480                                           g_buffered_input_stream_fill_async);
481       g_simple_async_result_complete_in_idle (simple);
482       g_object_unref (simple);
483       return;
484     }
485
486   if (count < -1)
487     {
488       g_simple_async_report_error_in_idle (G_OBJECT (stream),
489                                            callback,
490                                            user_data,
491                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
492                                            _("Too large count value passed to %s"),
493                                            G_STRFUNC);
494       return;
495     }
496
497   if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
498     {
499       g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
500                                             callback,
501                                             user_data,
502                                             error);
503       g_error_free (error);
504       return;
505     }
506
507   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
508
509   stream->priv->outstanding_callback = callback;
510   g_object_ref (stream);
511   class->fill_async (stream, count, io_priority, cancellable,
512                      async_fill_callback_wrapper, user_data);
513 }
514
515 /**
516  * g_buffered_input_stream_fill_finish:
517  * @stream: a #GBufferedInputStream
518  * @result: a #GAsyncResult
519  * @error: a #GError
520  *
521  * Finishes an asynchronous read.
522  *
523  * Returns: a #gssize of the read stream, or %-1 on an error.
524  */
525 gssize
526 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
527                                      GAsyncResult          *result,
528                                      GError               **error)
529 {
530   GSimpleAsyncResult *simple;
531   GBufferedInputStreamClass *class;
532
533   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
534   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
535
536   if (G_IS_SIMPLE_ASYNC_RESULT (result))
537     {
538       simple = G_SIMPLE_ASYNC_RESULT (result);
539       if (g_simple_async_result_propagate_error (simple, error))
540         return -1;
541
542       /* Special case read of 0 bytes */
543       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
544         return 0;
545     }
546
547   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
548   return class->fill_finish (stream, result, error);
549 }
550
551 /**
552  * g_buffered_input_stream_get_available:
553  * @stream: #GBufferedInputStream
554  *
555  * Gets the size of the available data within the stream.
556  *
557  * Returns: size of the available stream.
558  */
559 gsize
560 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
561 {
562   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
563
564   return stream->priv->end - stream->priv->pos;
565 }
566
567 /**
568  * g_buffered_input_stream_peek:
569  * @stream: a #GBufferedInputStream
570  * @buffer: a pointer to an allocated chunk of memory
571  * @offset: a #gsize
572  * @count: a #gsize
573  *
574  * Peeks in the buffer, copying data of size @count into @buffer,
575  * offset @offset bytes.
576  *
577  * Returns: a #gsize of the number of bytes peeked, or -1 on error.
578  */
579 gsize
580 g_buffered_input_stream_peek (GBufferedInputStream *stream,
581                               void                 *buffer,
582                               gsize                 offset,
583                               gsize                 count)
584 {
585   gsize available;
586   gsize end;
587
588   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
589   g_return_val_if_fail (buffer != NULL, -1);
590
591   available = g_buffered_input_stream_get_available (stream);
592
593   if (offset > available)
594     return 0;
595
596   end = MIN (offset + count, available);
597   count = end - offset;
598
599   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
600   return count;
601 }
602
603 /**
604  * g_buffered_input_stream_peek_buffer:
605  * @stream: a #GBufferedInputStream
606  * @count: a #gsize to get the number of bytes available in the buffer
607  *
608  * Returns the buffer with the currently available bytes. The returned
609  * buffer must not be modified and will become invalid when reading from
610  * the stream or filling the buffer.
611  *
612  * Returns: read-only buffer
613  */
614 const void*
615 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
616                                      gsize                *count)
617 {
618   GBufferedInputStreamPrivate *priv;
619
620   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
621
622   priv = stream->priv;
623
624   if (count)
625     *count = priv->end - priv->pos;
626
627   return priv->buffer + priv->pos;
628 }
629
630 static void
631 compact_buffer (GBufferedInputStream *stream)
632 {
633   GBufferedInputStreamPrivate *priv;
634   gsize current_size;
635
636   priv = stream->priv;
637
638   current_size = priv->end - priv->pos;
639
640   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
641
642   priv->pos = 0;
643   priv->end = current_size;
644 }
645
646 static gssize
647 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
648                                    gssize                 count,
649                                    GCancellable          *cancellable,
650                                    GError               **error)
651 {
652   GBufferedInputStreamPrivate *priv;
653   GInputStream *base_stream;
654   gssize nread;
655   gsize in_buffer;
656
657   priv = stream->priv;
658
659   if (count == -1)
660     count = priv->len;
661
662   in_buffer = priv->end - priv->pos;
663
664   /* Never fill more than can fit in the buffer */
665   count = MIN (count, priv->len - in_buffer);
666
667   /* If requested length does not fit at end, compact */
668   if (priv->len - priv->end < count)
669     compact_buffer (stream);
670
671   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
672   nread = g_input_stream_read (base_stream,
673                                priv->buffer + priv->end,
674                                count,
675                                cancellable,
676                                error);
677
678   if (nread > 0)
679     priv->end += nread;
680
681   return nread;
682 }
683
684 static gssize
685 g_buffered_input_stream_skip (GInputStream  *stream,
686                               gsize          count,
687                               GCancellable  *cancellable,
688                               GError       **error)
689 {
690   GBufferedInputStream        *bstream;
691   GBufferedInputStreamPrivate *priv;
692   GBufferedInputStreamClass *class;
693   GInputStream *base_stream;
694   gsize available, bytes_skipped;
695   gssize nread;
696
697   bstream = G_BUFFERED_INPUT_STREAM (stream);
698   priv = bstream->priv;
699
700   available = priv->end - priv->pos;
701
702   if (count <= available)
703     {
704       priv->pos += count;
705       return count;
706     }
707
708   /* Full request not available, skip all currently available and
709    * request refill for more
710    */
711
712   priv->pos = 0;
713   priv->end = 0;
714   bytes_skipped = available;
715   count -= available;
716
717   if (bytes_skipped > 0)
718     error = NULL; /* Ignore further errors if we already read some data */
719
720   if (count > priv->len)
721     {
722       /* Large request, shortcut buffer */
723
724       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
725
726       nread = g_input_stream_skip (base_stream,
727                                    count,
728                                    cancellable,
729                                    error);
730
731       if (nread < 0 && bytes_skipped == 0)
732         return -1;
733
734       if (nread > 0)
735         bytes_skipped += nread;
736
737       return bytes_skipped;
738     }
739
740   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
741   nread = class->fill (bstream, priv->len, cancellable, error);
742
743   if (nread < 0)
744     {
745       if (bytes_skipped == 0)
746         return -1;
747       else
748         return bytes_skipped;
749     }
750
751   available = priv->end - priv->pos;
752   count = MIN (count, available);
753
754   bytes_skipped += count;
755   priv->pos += count;
756
757   return bytes_skipped;
758 }
759
760 static gssize
761 g_buffered_input_stream_read (GInputStream *stream,
762                               void         *buffer,
763                               gsize         count,
764                               GCancellable *cancellable,
765                               GError      **error)
766 {
767   GBufferedInputStream        *bstream;
768   GBufferedInputStreamPrivate *priv;
769   GBufferedInputStreamClass *class;
770   GInputStream *base_stream;
771   gsize available, bytes_read;
772   gssize nread;
773
774   bstream = G_BUFFERED_INPUT_STREAM (stream);
775   priv = bstream->priv;
776
777   available = priv->end - priv->pos;
778
779   if (count <= available)
780     {
781       memcpy (buffer, priv->buffer + priv->pos, count);
782       priv->pos += count;
783       return count;
784     }
785
786   /* Full request not available, read all currently available and
787    * request refill for more
788    */
789
790   memcpy (buffer, priv->buffer + priv->pos, available);
791   priv->pos = 0;
792   priv->end = 0;
793   bytes_read = available;
794   count -= available;
795
796   if (bytes_read > 0)
797     error = NULL; /* Ignore further errors if we already read some data */
798
799   if (count > priv->len)
800     {
801       /* Large request, shortcut buffer */
802
803       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
804
805       nread = g_input_stream_read (base_stream,
806                                    (char *)buffer + bytes_read,
807                                    count,
808                                    cancellable,
809                                    error);
810
811       if (nread < 0 && bytes_read == 0)
812         return -1;
813
814       if (nread > 0)
815         bytes_read += nread;
816
817       return bytes_read;
818     }
819
820   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
821   nread = class->fill (bstream, priv->len, cancellable, error);
822   if (nread < 0)
823     {
824       if (bytes_read == 0)
825         return -1;
826       else
827         return bytes_read;
828     }
829
830   available = priv->end - priv->pos;
831   count = MIN (count, available);
832
833   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
834   bytes_read += count;
835   priv->pos += count;
836
837   return bytes_read;
838 }
839
840 /**
841  * g_buffered_input_stream_read_byte:
842  * @stream: a #GBufferedInputStream
843  * @cancellable: optional #GCancellable object, %NULL to ignore
844  * @error: location to store the error occuring, or %NULL to ignore
845  *
846  * Tries to read a single byte from the stream or the buffer. Will block
847  * during this read.
848  *
849  * On success, the byte read from the stream is returned. On end of stream
850  * -1 is returned but it's not an exceptional error and @error is not set.
851  *
852  * If @cancellable is not %NULL, then the operation can be cancelled by
853  * triggering the cancellable object from another thread. If the operation
854  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
855  * operation was partially finished when the operation was cancelled the
856  * partial result will be returned, without an error.
857  *
858  * On error -1 is returned and @error is set accordingly.
859  *
860  * Returns: the byte read from the @stream, or -1 on end of stream or error.
861  */
862 int
863 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
864                                    GCancellable          *cancellable,
865                                    GError               **error)
866 {
867   GBufferedInputStreamPrivate *priv;
868   GBufferedInputStreamClass *class;
869   GInputStream *input_stream;
870   gsize available;
871   gssize nread;
872
873   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
874
875   priv = stream->priv;
876   input_stream = G_INPUT_STREAM (stream);
877
878   if (g_input_stream_is_closed (input_stream))
879     {
880       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
881                            _("Stream is already closed"));
882       return -1;
883     }
884
885   if (!g_input_stream_set_pending (input_stream, error))
886     return -1;
887
888   available = priv->end - priv->pos;
889
890   if (available != 0)
891     {
892       g_input_stream_clear_pending (input_stream);
893       return priv->buffer[priv->pos++];
894     }
895
896   /* Byte not available, request refill for more */
897
898   if (cancellable)
899     g_cancellable_push_current (cancellable);
900
901   priv->pos = 0;
902   priv->end = 0;
903
904   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
905   nread = class->fill (stream, priv->len, cancellable, error);
906
907   if (cancellable)
908     g_cancellable_pop_current (cancellable);
909
910   g_input_stream_clear_pending (input_stream);
911
912   if (nread <= 0)
913     return -1; /* error or end of stream */
914
915   return priv->buffer[priv->pos++];
916 }
917
918 /* ************************** */
919 /* Async stuff implementation */
920 /* ************************** */
921
922 static void
923 fill_async_callback (GObject      *source_object,
924                      GAsyncResult *result,
925                      gpointer      user_data)
926 {
927   GError *error;
928   gssize res;
929   GSimpleAsyncResult *simple;
930
931   simple = user_data;
932
933   error = NULL;
934   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
935                                     result, &error);
936
937   g_simple_async_result_set_op_res_gssize (simple, res);
938   if (res == -1)
939     {
940       g_simple_async_result_set_from_error (simple, error);
941       g_error_free (error);
942     }
943   else
944     {
945       GBufferedInputStreamPrivate *priv;
946       GObject *object;
947
948       object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
949       priv = G_BUFFERED_INPUT_STREAM (object)->priv;
950
951       g_assert_cmpint (priv->end + res, <=, priv->len);
952       priv->end += res;
953
954       g_object_unref (object);
955     }
956
957   /* Complete immediately, not in idle, since we're already
958    * in a mainloop callout
959    */
960   g_simple_async_result_complete (simple);
961   g_object_unref (simple);
962 }
963
964 static void
965 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
966                                          gssize                count,
967                                          int                   io_priority,
968                                          GCancellable         *cancellable,
969                                          GAsyncReadyCallback   callback,
970                                          gpointer              user_data)
971 {
972   GBufferedInputStreamPrivate *priv;
973   GInputStream *base_stream;
974   GSimpleAsyncResult *simple;
975   gsize in_buffer;
976
977   priv = stream->priv;
978
979   if (count == -1)
980     count = priv->len;
981
982   in_buffer = priv->end - priv->pos;
983
984   /* Never fill more than can fit in the buffer */
985   count = MIN (count, priv->len - in_buffer);
986
987   /* If requested length does not fit at end, compact */
988   if (priv->len - priv->end < count)
989     compact_buffer (stream);
990
991   simple = g_simple_async_result_new (G_OBJECT (stream),
992                                       callback, user_data,
993                                       g_buffered_input_stream_real_fill_async);
994
995   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
996   g_input_stream_read_async (base_stream,
997                              priv->buffer + priv->end,
998                              count,
999                              io_priority,
1000                              cancellable,
1001                              fill_async_callback,
1002                              simple);
1003 }
1004
1005 static gssize
1006 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
1007                                           GAsyncResult         *result,
1008                                           GError              **error)
1009 {
1010   GSimpleAsyncResult *simple;
1011   gssize nread;
1012
1013   simple = G_SIMPLE_ASYNC_RESULT (result);
1014   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1015
1016   nread = g_simple_async_result_get_op_res_gssize (simple);
1017   return nread;
1018 }
1019
1020 typedef struct
1021 {
1022   gssize bytes_read;
1023   gssize count;
1024   void *buffer;
1025 } ReadAsyncData;
1026
1027 static void
1028 free_read_async_data (gpointer _data)
1029 {
1030   ReadAsyncData *data = _data;
1031   g_slice_free (ReadAsyncData, data);
1032 }
1033
1034 static void
1035 large_read_callback (GObject *source_object,
1036                      GAsyncResult *result,
1037                      gpointer user_data)
1038 {
1039   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1040   ReadAsyncData *data;
1041   GError *error;
1042   gssize nread;
1043
1044   data = g_simple_async_result_get_op_res_gpointer (simple);
1045
1046   error = NULL;
1047   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1048                                       result, &error);
1049
1050   /* Only report the error if we've not already read some data */
1051   if (nread < 0 && data->bytes_read == 0)
1052     g_simple_async_result_set_from_error (simple, error);
1053
1054   if (nread > 0)
1055     data->bytes_read += nread;
1056
1057   if (error)
1058     g_error_free (error);
1059
1060   /* Complete immediately, not in idle, since we're already
1061    * in a mainloop callout
1062    */
1063   g_simple_async_result_complete (simple);
1064   g_object_unref (simple);
1065 }
1066
1067 static void
1068 read_fill_buffer_callback (GObject      *source_object,
1069                            GAsyncResult *result,
1070                            gpointer      user_data)
1071 {
1072   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1073   GBufferedInputStream *bstream;
1074   GBufferedInputStreamPrivate *priv;
1075   ReadAsyncData *data;
1076   GError *error;
1077   gssize nread;
1078   gsize available;
1079
1080   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1081   priv = bstream->priv;
1082
1083   data = g_simple_async_result_get_op_res_gpointer (simple);
1084
1085   error = NULL;
1086   nread = g_buffered_input_stream_fill_finish (bstream,
1087                                                result, &error);
1088
1089   if (nread < 0 && data->bytes_read == 0)
1090     g_simple_async_result_set_from_error (simple, error);
1091
1092
1093   if (nread > 0)
1094     {
1095       available = priv->end - priv->pos;
1096       data->count = MIN (data->count, available);
1097
1098       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1099       data->bytes_read += data->count;
1100       priv->pos += data->count;
1101     }
1102
1103   if (error)
1104     g_error_free (error);
1105
1106   /* Complete immediately, not in idle, since we're already
1107    * in a mainloop callout
1108    */
1109   g_simple_async_result_complete (simple);
1110   g_object_unref (simple);
1111 }
1112
1113 static void
1114 g_buffered_input_stream_read_async (GInputStream        *stream,
1115                                     void                *buffer,
1116                                     gsize                count,
1117                                     int                  io_priority,
1118                                     GCancellable        *cancellable,
1119                                     GAsyncReadyCallback  callback,
1120                                     gpointer             user_data)
1121 {
1122   GBufferedInputStream *bstream;
1123   GBufferedInputStreamPrivate *priv;
1124   GBufferedInputStreamClass *class;
1125   GInputStream *base_stream;
1126   gsize available;
1127   GSimpleAsyncResult *simple;
1128   ReadAsyncData *data;
1129
1130   bstream = G_BUFFERED_INPUT_STREAM (stream);
1131   priv = bstream->priv;
1132
1133   data = g_slice_new (ReadAsyncData);
1134   data->buffer = buffer;
1135   data->bytes_read = 0;
1136   simple = g_simple_async_result_new (G_OBJECT (stream),
1137                                       callback, user_data,
1138                                       g_buffered_input_stream_read_async);
1139   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1140
1141   available = priv->end - priv->pos;
1142
1143   if (count <= available)
1144     {
1145       memcpy (buffer, priv->buffer + priv->pos, count);
1146       priv->pos += count;
1147       data->bytes_read = count;
1148
1149       g_simple_async_result_complete_in_idle (simple);
1150       g_object_unref (simple);
1151       return;
1152     }
1153
1154
1155   /* Full request not available, read all currently available
1156    * and request refill for more
1157    */
1158
1159   memcpy (buffer, priv->buffer + priv->pos, available);
1160   priv->pos = 0;
1161   priv->end = 0;
1162
1163   count -= available;
1164
1165   data->bytes_read = available;
1166   data->count = count;
1167
1168   if (count > priv->len)
1169     {
1170       /* Large request, shortcut buffer */
1171
1172       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1173
1174       g_input_stream_read_async (base_stream,
1175                                  (char *)buffer + data->bytes_read,
1176                                  count,
1177                                  io_priority, cancellable,
1178                                  large_read_callback,
1179                                  simple);
1180     }
1181   else
1182     {
1183       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1184       class->fill_async (bstream, priv->len, io_priority, cancellable,
1185                          read_fill_buffer_callback, simple);
1186     }
1187 }
1188
1189 static gssize
1190 g_buffered_input_stream_read_finish (GInputStream   *stream,
1191                                      GAsyncResult   *result,
1192                                      GError        **error)
1193 {
1194   GSimpleAsyncResult *simple;
1195   ReadAsyncData *data;
1196
1197   simple = G_SIMPLE_ASYNC_RESULT (result);
1198
1199   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1200
1201   data = g_simple_async_result_get_op_res_gpointer (simple);
1202
1203   return data->bytes_read;
1204 }
1205
1206 typedef struct
1207 {
1208   gssize bytes_skipped;
1209   gssize count;
1210 } SkipAsyncData;
1211
1212 static void
1213 free_skip_async_data (gpointer _data)
1214 {
1215   SkipAsyncData *data = _data;
1216   g_slice_free (SkipAsyncData, data);
1217 }
1218
1219 static void
1220 large_skip_callback (GObject      *source_object,
1221                      GAsyncResult *result,
1222                      gpointer      user_data)
1223 {
1224   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1225   SkipAsyncData *data;
1226   GError *error;
1227   gssize nread;
1228
1229   data = g_simple_async_result_get_op_res_gpointer (simple);
1230
1231   error = NULL;
1232   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1233                                       result, &error);
1234
1235   /* Only report the error if we've not already read some data */
1236   if (nread < 0 && data->bytes_skipped == 0)
1237     g_simple_async_result_set_from_error (simple, error);
1238
1239   if (nread > 0)
1240     data->bytes_skipped += nread;
1241
1242   if (error)
1243     g_error_free (error);
1244
1245   /* Complete immediately, not in idle, since we're already
1246    * in a mainloop callout
1247    */
1248   g_simple_async_result_complete (simple);
1249   g_object_unref (simple);
1250 }
1251
1252 static void
1253 skip_fill_buffer_callback (GObject      *source_object,
1254                            GAsyncResult *result,
1255                            gpointer      user_data)
1256 {
1257   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1258   GBufferedInputStream *bstream;
1259   GBufferedInputStreamPrivate *priv;
1260   SkipAsyncData *data;
1261   GError *error;
1262   gssize nread;
1263   gsize available;
1264
1265   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1266   priv = bstream->priv;
1267
1268   data = g_simple_async_result_get_op_res_gpointer (simple);
1269
1270   error = NULL;
1271   nread = g_buffered_input_stream_fill_finish (bstream,
1272                                                result, &error);
1273
1274   if (nread < 0 && data->bytes_skipped == 0)
1275     g_simple_async_result_set_from_error (simple, error);
1276
1277   if (nread > 0)
1278     {
1279       available = priv->end - priv->pos;
1280       data->count = MIN (data->count, available);
1281
1282       data->bytes_skipped += data->count;
1283       priv->pos += data->count;
1284     }
1285
1286   if (error)
1287     g_error_free (error);
1288
1289   /* Complete immediately, not in idle, since we're already
1290    * in a mainloop callout
1291    */
1292   g_simple_async_result_complete (simple);
1293   g_object_unref (simple);
1294 }
1295
1296 static void
1297 g_buffered_input_stream_skip_async (GInputStream        *stream,
1298                                     gsize                count,
1299                                     int                  io_priority,
1300                                     GCancellable        *cancellable,
1301                                     GAsyncReadyCallback  callback,
1302                                     gpointer             user_data)
1303 {
1304   GBufferedInputStream *bstream;
1305   GBufferedInputStreamPrivate *priv;
1306   GBufferedInputStreamClass *class;
1307   GInputStream *base_stream;
1308   gsize available;
1309   GSimpleAsyncResult *simple;
1310   SkipAsyncData *data;
1311
1312   bstream = G_BUFFERED_INPUT_STREAM (stream);
1313   priv = bstream->priv;
1314
1315   data = g_slice_new (SkipAsyncData);
1316   data->bytes_skipped = 0;
1317   simple = g_simple_async_result_new (G_OBJECT (stream),
1318                                       callback, user_data,
1319                                       g_buffered_input_stream_skip_async);
1320   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1321
1322   available = priv->end - priv->pos;
1323
1324   if (count <= available)
1325     {
1326       priv->pos += count;
1327       data->bytes_skipped = count;
1328
1329       g_simple_async_result_complete_in_idle (simple);
1330       g_object_unref (simple);
1331       return;
1332     }
1333
1334   /* Full request not available, skip all currently available
1335    * and request refill for more
1336    */
1337
1338   priv->pos = 0;
1339   priv->end = 0;
1340
1341   count -= available;
1342
1343   data->bytes_skipped = available;
1344   data->count = count;
1345
1346   if (count > priv->len)
1347     {
1348       /* Large request, shortcut buffer */
1349
1350       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1351
1352       g_input_stream_skip_async (base_stream,
1353                                  count,
1354                                  io_priority, cancellable,
1355                                  large_skip_callback,
1356                                  simple);
1357     }
1358   else
1359     {
1360       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1361       class->fill_async (bstream, priv->len, io_priority, cancellable,
1362                          skip_fill_buffer_callback, simple);
1363     }
1364 }
1365
1366 static gssize
1367 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1368                                      GAsyncResult   *result,
1369                                      GError        **error)
1370 {
1371   GSimpleAsyncResult *simple;
1372   SkipAsyncData *data;
1373
1374   simple = G_SIMPLE_ASYNC_RESULT (result);
1375
1376   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1377
1378   data = g_simple_async_result_get_op_res_gpointer (simple);
1379
1380   return data->bytes_skipped;
1381 }