Add g_simple_async_report_take_gerror_in_idle
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org>
22  */
23
24 #include "config.h"
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gcancellable.h"
28 #include "gasyncresult.h"
29 #include "gsimpleasyncresult.h"
30 #include "gioerror.h"
31 #include <string.h>
32 #include "glibintl.h"
33
34
35 /**
36  * SECTION:gbufferedinputstream
37  * @short_description: Buffered Input Stream
38  * @include: gio/gio.h
39  * @see_also: #GFilterInputStream, #GInputStream
40  *
41  * Buffered input stream implements #GFilterInputStream and provides
42  * for buffered reads.
43  *
44  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
45  *
46  * To create a buffered input stream, use g_buffered_input_stream_new(),
47  * or g_buffered_input_stream_new_sized() to specify the buffer's size at
48  * construction.
49  *
50  * To get the size of a buffer within a buffered input stream, use
51  * g_buffered_input_stream_get_buffer_size(). To change the size of a
52  * buffered input stream's buffer, use
53  * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
54  * cannot be reduced below the size of the data within the buffer.
55  */
56
57
58 #define DEFAULT_BUFFER_SIZE 4096
59
60 struct _GBufferedInputStreamPrivate {
61   guint8 *buffer;
62   gsize   len;
63   gsize   pos;
64   gsize   end;
65   GAsyncReadyCallback outstanding_callback;
66 };
67
68 enum {
69   PROP_0,
70   PROP_BUFSIZE
71 };
72
73 static void g_buffered_input_stream_set_property  (GObject      *object,
74                                                    guint         prop_id,
75                                                    const GValue *value,
76                                                    GParamSpec   *pspec);
77
78 static void g_buffered_input_stream_get_property  (GObject      *object,
79                                                    guint         prop_id,
80                                                    GValue       *value,
81                                                    GParamSpec   *pspec);
82 static void g_buffered_input_stream_finalize      (GObject *object);
83
84
85 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
86                                                         gsize                  count,
87                                                         GCancellable          *cancellable,
88                                                         GError               **error);
89 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
90                                                         gsize                  count,
91                                                         int                    io_priority,
92                                                         GCancellable          *cancellable,
93                                                         GAsyncReadyCallback    callback,
94                                                         gpointer               user_data);
95 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
96                                                         GAsyncResult          *result,
97                                                         GError               **error);
98 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
99                                                         void                  *buffer,
100                                                         gsize                  count,
101                                                         GCancellable          *cancellable,
102                                                         GError               **error);
103 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
104                                                         void                  *buffer,
105                                                         gsize                  count,
106                                                         int                    io_priority,
107                                                         GCancellable          *cancellable,
108                                                         GAsyncReadyCallback    callback,
109                                                         gpointer               user_data);
110 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
111                                                         GAsyncResult          *result,
112                                                         GError               **error);
113 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
114                                                         gssize                 count,
115                                                         GCancellable          *cancellable,
116                                                         GError               **error);
117 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
118                                                         gssize                 count,
119                                                         int                    io_priority,
120                                                         GCancellable          *cancellable,
121                                                         GAsyncReadyCallback    callback,
122                                                         gpointer               user_data);
123 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
124                                                         GAsyncResult          *result,
125                                                         GError               **error);
126
127 static void compact_buffer (GBufferedInputStream *stream);
128
129 G_DEFINE_TYPE (GBufferedInputStream,
130                g_buffered_input_stream,
131                G_TYPE_FILTER_INPUT_STREAM)
132
133
134 static void
135 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
136 {
137   GObjectClass *object_class;
138   GInputStreamClass *istream_class;
139   GBufferedInputStreamClass *bstream_class;
140
141   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
142
143   object_class = G_OBJECT_CLASS (klass);
144   object_class->get_property = g_buffered_input_stream_get_property;
145   object_class->set_property = g_buffered_input_stream_set_property;
146   object_class->finalize     = g_buffered_input_stream_finalize;
147
148   istream_class = G_INPUT_STREAM_CLASS (klass);
149   istream_class->skip = g_buffered_input_stream_skip;
150   istream_class->skip_async  = g_buffered_input_stream_skip_async;
151   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
152   istream_class->read_fn = g_buffered_input_stream_read;
153   istream_class->read_async  = g_buffered_input_stream_read_async;
154   istream_class->read_finish = g_buffered_input_stream_read_finish;
155
156   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
157   bstream_class->fill = g_buffered_input_stream_real_fill;
158   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
159   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
160
161   g_object_class_install_property (object_class,
162                                    PROP_BUFSIZE,
163                                    g_param_spec_uint ("buffer-size",
164                                                       P_("Buffer Size"),
165                                                       P_("The size of the backend buffer"),
166                                                       1,
167                                                       G_MAXUINT,
168                                                       DEFAULT_BUFFER_SIZE,
169                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
170                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
171
172
173 }
174
175 /**
176  * g_buffered_input_stream_get_buffer_size:
177  * @stream: a #GBufferedInputStream
178  *
179  * Gets the size of the input buffer.
180  *
181  * Returns: the current buffer size.
182  */
183 gsize
184 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
185 {
186   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
187
188   return stream->priv->len;
189 }
190
191 /**
192  * g_buffered_input_stream_set_buffer_size:
193  * @stream: a #GBufferedInputStream
194  * @size: a #gsize
195  *
196  * Sets the size of the internal buffer of @stream to @size, or to the
197  * size of the contents of the buffer. The buffer can never be resized
198  * smaller than its current contents.
199  */
200 void
201 g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream,
202                                          gsize                 size)
203 {
204   GBufferedInputStreamPrivate *priv;
205   gsize in_buffer;
206   guint8 *buffer;
207
208   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
209
210   priv = stream->priv;
211
212   if (priv->len == size)
213     return;
214
215   if (priv->buffer)
216     {
217       in_buffer = priv->end - priv->pos;
218
219       /* Never resize smaller than current buffer contents */
220       size = MAX (size, in_buffer);
221
222       buffer = g_malloc (size);
223       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
224       priv->len = size;
225       priv->pos = 0;
226       priv->end = in_buffer;
227       g_free (priv->buffer);
228       priv->buffer = buffer;
229     }
230   else
231     {
232       priv->len = size;
233       priv->pos = 0;
234       priv->end = 0;
235       priv->buffer = g_malloc (size);
236     }
237
238   g_object_notify (G_OBJECT (stream), "buffer-size");
239 }
240
241 static void
242 g_buffered_input_stream_set_property (GObject      *object,
243                                       guint         prop_id,
244                                       const GValue *value,
245                                       GParamSpec   *pspec)
246 {
247   GBufferedInputStream        *bstream;
248
249   bstream = G_BUFFERED_INPUT_STREAM (object);
250
251   switch (prop_id)
252     {
253     case PROP_BUFSIZE:
254       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
255       break;
256
257     default:
258       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
259       break;
260     }
261 }
262
263 static void
264 g_buffered_input_stream_get_property (GObject    *object,
265                                       guint       prop_id,
266                                       GValue     *value,
267                                       GParamSpec *pspec)
268 {
269   GBufferedInputStreamPrivate *priv;
270   GBufferedInputStream        *bstream;
271
272   bstream = G_BUFFERED_INPUT_STREAM (object);
273   priv = bstream->priv;
274
275   switch (prop_id)
276     {
277     case PROP_BUFSIZE:
278       g_value_set_uint (value, priv->len);
279       break;
280
281     default:
282       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
283       break;
284     }
285 }
286
287 static void
288 g_buffered_input_stream_finalize (GObject *object)
289 {
290   GBufferedInputStreamPrivate *priv;
291   GBufferedInputStream        *stream;
292
293   stream = G_BUFFERED_INPUT_STREAM (object);
294   priv = stream->priv;
295
296   g_free (priv->buffer);
297
298   G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
299 }
300
301 static void
302 g_buffered_input_stream_init (GBufferedInputStream *stream)
303 {
304   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
305                                               G_TYPE_BUFFERED_INPUT_STREAM,
306                                               GBufferedInputStreamPrivate);
307 }
308
309
310 /**
311  * g_buffered_input_stream_new:
312  * @base_stream: a #GInputStream
313  *
314  * Creates a new #GInputStream from the given @base_stream, with
315  * a buffer set to the default size (4 kilobytes).
316  *
317  * Returns: a #GInputStream for the given @base_stream.
318  */
319 GInputStream *
320 g_buffered_input_stream_new (GInputStream *base_stream)
321 {
322   GInputStream *stream;
323
324   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
325
326   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
327                          "base-stream", base_stream,
328                          NULL);
329
330   return stream;
331 }
332
333 /**
334  * g_buffered_input_stream_new_sized:
335  * @base_stream: a #GInputStream
336  * @size: a #gsize
337  *
338  * Creates a new #GBufferedInputStream from the given @base_stream,
339  * with a buffer set to @size.
340  *
341  * Returns: a #GInputStream.
342  */
343 GInputStream *
344 g_buffered_input_stream_new_sized (GInputStream *base_stream,
345                                    gsize         size)
346 {
347   GInputStream *stream;
348
349   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
350
351   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
352                          "base-stream", base_stream,
353                          "buffer-size", (guint)size,
354                          NULL);
355
356   return stream;
357 }
358
359 /**
360  * g_buffered_input_stream_fill:
361  * @stream: a #GBufferedInputStream
362  * @count: the number of bytes that will be read from the stream
363  * @cancellable: optional #GCancellable object, %NULL to ignore
364  * @error: location to store the error occuring, or %NULL to ignore
365  *
366  * Tries to read @count bytes from the stream into the buffer.
367  * Will block during this read.
368  *
369  * If @count is zero, returns zero and does nothing. A value of @count
370  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
371  *
372  * On success, the number of bytes read into the buffer is returned.
373  * It is not an error if this is not the same as the requested size, as it
374  * can happen e.g. near the end of a file. Zero is returned on end of file
375  * (or if @count is zero),  but never otherwise.
376  *
377  * If @count is -1 then the attempted read size is equal to the number of
378  * bytes that are required to fill the buffer.
379  *
380  * If @cancellable is not %NULL, then the operation can be cancelled by
381  * triggering the cancellable object from another thread. If the operation
382  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
383  * operation was partially finished when the operation was cancelled the
384  * partial result will be returned, without an error.
385  *
386  * On error -1 is returned and @error is set accordingly.
387  *
388  * For the asynchronous, non-blocking, version of this function, see
389  * g_buffered_input_stream_fill_async().
390  *
391  * Returns: the number of bytes read into @stream's buffer, up to @count,
392  *     or -1 on error.
393  */
394 gssize
395 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
396                               gssize                 count,
397                               GCancellable          *cancellable,
398                               GError               **error)
399 {
400   GBufferedInputStreamClass *class;
401   GInputStream *input_stream;
402   gssize res;
403
404   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
405
406   input_stream = G_INPUT_STREAM (stream);
407
408   if (count < -1)
409     {
410       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
411                    _("Too large count value passed to %s"), G_STRFUNC);
412       return -1;
413     }
414
415   if (!g_input_stream_set_pending (input_stream, error))
416     return -1;
417
418   if (cancellable)
419     g_cancellable_push_current (cancellable);
420
421   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
422   res = class->fill (stream, count, cancellable, error);
423
424   if (cancellable)
425     g_cancellable_pop_current (cancellable);
426
427   g_input_stream_clear_pending (input_stream);
428
429   return res;
430 }
431
432 static void
433 async_fill_callback_wrapper (GObject      *source_object,
434                              GAsyncResult *res,
435                              gpointer      user_data)
436 {
437   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
438
439   g_input_stream_clear_pending (G_INPUT_STREAM (stream));
440   (*stream->priv->outstanding_callback) (source_object, res, user_data);
441   g_object_unref (stream);
442 }
443
444 /**
445  * g_buffered_input_stream_fill_async:
446  * @stream: a #GBufferedInputStream
447  * @count: the number of bytes that will be read from the stream
448  * @io_priority: the <link linkend="io-priority">I/O priority</link>
449  *     of the request
450  * @cancellable: optional #GCancellable object
451  * @callback: a #GAsyncReadyCallback
452  * @user_data: a #gpointer
453  *
454  * Reads data into @stream's buffer asynchronously, up to @count size.
455  * @io_priority can be used to prioritize reads. For the synchronous
456  * version of this function, see g_buffered_input_stream_fill().
457  *
458  * If @count is -1 then the attempted read size is equal to the number
459  * of bytes that are required to fill the buffer.
460  */
461 void
462 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
463                                     gssize                count,
464                                     int                   io_priority,
465                                     GCancellable         *cancellable,
466                                     GAsyncReadyCallback   callback,
467                                     gpointer              user_data)
468 {
469   GBufferedInputStreamClass *class;
470   GSimpleAsyncResult *simple;
471   GError *error = NULL;
472
473   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
474
475   if (count == 0)
476     {
477       simple = g_simple_async_result_new (G_OBJECT (stream),
478                                           callback,
479                                           user_data,
480                                           g_buffered_input_stream_fill_async);
481       g_simple_async_result_complete_in_idle (simple);
482       g_object_unref (simple);
483       return;
484     }
485
486   if (count < -1)
487     {
488       g_simple_async_report_error_in_idle (G_OBJECT (stream),
489                                            callback,
490                                            user_data,
491                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
492                                            _("Too large count value passed to %s"),
493                                            G_STRFUNC);
494       return;
495     }
496
497   if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
498     {
499       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
500                                             callback,
501                                             user_data,
502                                             error);
503       return;
504     }
505
506   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
507
508   stream->priv->outstanding_callback = callback;
509   g_object_ref (stream);
510   class->fill_async (stream, count, io_priority, cancellable,
511                      async_fill_callback_wrapper, user_data);
512 }
513
514 /**
515  * g_buffered_input_stream_fill_finish:
516  * @stream: a #GBufferedInputStream
517  * @result: a #GAsyncResult
518  * @error: a #GError
519  *
520  * Finishes an asynchronous read.
521  *
522  * Returns: a #gssize of the read stream, or %-1 on an error.
523  */
524 gssize
525 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
526                                      GAsyncResult          *result,
527                                      GError               **error)
528 {
529   GSimpleAsyncResult *simple;
530   GBufferedInputStreamClass *class;
531
532   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
533   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
534
535   if (G_IS_SIMPLE_ASYNC_RESULT (result))
536     {
537       simple = G_SIMPLE_ASYNC_RESULT (result);
538       if (g_simple_async_result_propagate_error (simple, error))
539         return -1;
540
541       /* Special case read of 0 bytes */
542       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
543         return 0;
544     }
545
546   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
547   return class->fill_finish (stream, result, error);
548 }
549
550 /**
551  * g_buffered_input_stream_get_available:
552  * @stream: #GBufferedInputStream
553  *
554  * Gets the size of the available data within the stream.
555  *
556  * Returns: size of the available stream.
557  */
558 gsize
559 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
560 {
561   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
562
563   return stream->priv->end - stream->priv->pos;
564 }
565
566 /**
567  * g_buffered_input_stream_peek:
568  * @stream: a #GBufferedInputStream
569  * @buffer: a pointer to an allocated chunk of memory
570  * @offset: a #gsize
571  * @count: a #gsize
572  *
573  * Peeks in the buffer, copying data of size @count into @buffer,
574  * offset @offset bytes.
575  *
576  * Returns: a #gsize of the number of bytes peeked, or -1 on error.
577  */
578 gsize
579 g_buffered_input_stream_peek (GBufferedInputStream *stream,
580                               void                 *buffer,
581                               gsize                 offset,
582                               gsize                 count)
583 {
584   gsize available;
585   gsize end;
586
587   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
588   g_return_val_if_fail (buffer != NULL, -1);
589
590   available = g_buffered_input_stream_get_available (stream);
591
592   if (offset > available)
593     return 0;
594
595   end = MIN (offset + count, available);
596   count = end - offset;
597
598   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
599   return count;
600 }
601
602 /**
603  * g_buffered_input_stream_peek_buffer:
604  * @stream: a #GBufferedInputStream
605  * @count: a #gsize to get the number of bytes available in the buffer
606  *
607  * Returns the buffer with the currently available bytes. The returned
608  * buffer must not be modified and will become invalid when reading from
609  * the stream or filling the buffer.
610  *
611  * Returns: read-only buffer
612  */
613 const void*
614 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
615                                      gsize                *count)
616 {
617   GBufferedInputStreamPrivate *priv;
618
619   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
620
621   priv = stream->priv;
622
623   if (count)
624     *count = priv->end - priv->pos;
625
626   return priv->buffer + priv->pos;
627 }
628
629 static void
630 compact_buffer (GBufferedInputStream *stream)
631 {
632   GBufferedInputStreamPrivate *priv;
633   gsize current_size;
634
635   priv = stream->priv;
636
637   current_size = priv->end - priv->pos;
638
639   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
640
641   priv->pos = 0;
642   priv->end = current_size;
643 }
644
645 static gssize
646 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
647                                    gssize                 count,
648                                    GCancellable          *cancellable,
649                                    GError               **error)
650 {
651   GBufferedInputStreamPrivate *priv;
652   GInputStream *base_stream;
653   gssize nread;
654   gsize in_buffer;
655
656   priv = stream->priv;
657
658   if (count == -1)
659     count = priv->len;
660
661   in_buffer = priv->end - priv->pos;
662
663   /* Never fill more than can fit in the buffer */
664   count = MIN (count, priv->len - in_buffer);
665
666   /* If requested length does not fit at end, compact */
667   if (priv->len - priv->end < count)
668     compact_buffer (stream);
669
670   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
671   nread = g_input_stream_read (base_stream,
672                                priv->buffer + priv->end,
673                                count,
674                                cancellable,
675                                error);
676
677   if (nread > 0)
678     priv->end += nread;
679
680   return nread;
681 }
682
683 static gssize
684 g_buffered_input_stream_skip (GInputStream  *stream,
685                               gsize          count,
686                               GCancellable  *cancellable,
687                               GError       **error)
688 {
689   GBufferedInputStream        *bstream;
690   GBufferedInputStreamPrivate *priv;
691   GBufferedInputStreamClass *class;
692   GInputStream *base_stream;
693   gsize available, bytes_skipped;
694   gssize nread;
695
696   bstream = G_BUFFERED_INPUT_STREAM (stream);
697   priv = bstream->priv;
698
699   available = priv->end - priv->pos;
700
701   if (count <= available)
702     {
703       priv->pos += count;
704       return count;
705     }
706
707   /* Full request not available, skip all currently available and
708    * request refill for more
709    */
710
711   priv->pos = 0;
712   priv->end = 0;
713   bytes_skipped = available;
714   count -= available;
715
716   if (bytes_skipped > 0)
717     error = NULL; /* Ignore further errors if we already read some data */
718
719   if (count > priv->len)
720     {
721       /* Large request, shortcut buffer */
722
723       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
724
725       nread = g_input_stream_skip (base_stream,
726                                    count,
727                                    cancellable,
728                                    error);
729
730       if (nread < 0 && bytes_skipped == 0)
731         return -1;
732
733       if (nread > 0)
734         bytes_skipped += nread;
735
736       return bytes_skipped;
737     }
738
739   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
740   nread = class->fill (bstream, priv->len, cancellable, error);
741
742   if (nread < 0)
743     {
744       if (bytes_skipped == 0)
745         return -1;
746       else
747         return bytes_skipped;
748     }
749
750   available = priv->end - priv->pos;
751   count = MIN (count, available);
752
753   bytes_skipped += count;
754   priv->pos += count;
755
756   return bytes_skipped;
757 }
758
759 static gssize
760 g_buffered_input_stream_read (GInputStream *stream,
761                               void         *buffer,
762                               gsize         count,
763                               GCancellable *cancellable,
764                               GError      **error)
765 {
766   GBufferedInputStream        *bstream;
767   GBufferedInputStreamPrivate *priv;
768   GBufferedInputStreamClass *class;
769   GInputStream *base_stream;
770   gsize available, bytes_read;
771   gssize nread;
772
773   bstream = G_BUFFERED_INPUT_STREAM (stream);
774   priv = bstream->priv;
775
776   available = priv->end - priv->pos;
777
778   if (count <= available)
779     {
780       memcpy (buffer, priv->buffer + priv->pos, count);
781       priv->pos += count;
782       return count;
783     }
784
785   /* Full request not available, read all currently available and
786    * request refill for more
787    */
788
789   memcpy (buffer, priv->buffer + priv->pos, available);
790   priv->pos = 0;
791   priv->end = 0;
792   bytes_read = available;
793   count -= available;
794
795   if (bytes_read > 0)
796     error = NULL; /* Ignore further errors if we already read some data */
797
798   if (count > priv->len)
799     {
800       /* Large request, shortcut buffer */
801
802       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
803
804       nread = g_input_stream_read (base_stream,
805                                    (char *)buffer + bytes_read,
806                                    count,
807                                    cancellable,
808                                    error);
809
810       if (nread < 0 && bytes_read == 0)
811         return -1;
812
813       if (nread > 0)
814         bytes_read += nread;
815
816       return bytes_read;
817     }
818
819   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
820   nread = class->fill (bstream, priv->len, cancellable, error);
821   if (nread < 0)
822     {
823       if (bytes_read == 0)
824         return -1;
825       else
826         return bytes_read;
827     }
828
829   available = priv->end - priv->pos;
830   count = MIN (count, available);
831
832   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
833   bytes_read += count;
834   priv->pos += count;
835
836   return bytes_read;
837 }
838
839 /**
840  * g_buffered_input_stream_read_byte:
841  * @stream: a #GBufferedInputStream
842  * @cancellable: optional #GCancellable object, %NULL to ignore
843  * @error: location to store the error occuring, or %NULL to ignore
844  *
845  * Tries to read a single byte from the stream or the buffer. Will block
846  * during this read.
847  *
848  * On success, the byte read from the stream is returned. On end of stream
849  * -1 is returned but it's not an exceptional error and @error is not set.
850  *
851  * If @cancellable is not %NULL, then the operation can be cancelled by
852  * triggering the cancellable object from another thread. If the operation
853  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
854  * operation was partially finished when the operation was cancelled the
855  * partial result will be returned, without an error.
856  *
857  * On error -1 is returned and @error is set accordingly.
858  *
859  * Returns: the byte read from the @stream, or -1 on end of stream or error.
860  */
861 int
862 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
863                                    GCancellable          *cancellable,
864                                    GError               **error)
865 {
866   GBufferedInputStreamPrivate *priv;
867   GBufferedInputStreamClass *class;
868   GInputStream *input_stream;
869   gsize available;
870   gssize nread;
871
872   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
873
874   priv = stream->priv;
875   input_stream = G_INPUT_STREAM (stream);
876
877   if (g_input_stream_is_closed (input_stream))
878     {
879       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
880                            _("Stream is already closed"));
881       return -1;
882     }
883
884   if (!g_input_stream_set_pending (input_stream, error))
885     return -1;
886
887   available = priv->end - priv->pos;
888
889   if (available != 0)
890     {
891       g_input_stream_clear_pending (input_stream);
892       return priv->buffer[priv->pos++];
893     }
894
895   /* Byte not available, request refill for more */
896
897   if (cancellable)
898     g_cancellable_push_current (cancellable);
899
900   priv->pos = 0;
901   priv->end = 0;
902
903   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
904   nread = class->fill (stream, priv->len, cancellable, error);
905
906   if (cancellable)
907     g_cancellable_pop_current (cancellable);
908
909   g_input_stream_clear_pending (input_stream);
910
911   if (nread <= 0)
912     return -1; /* error or end of stream */
913
914   return priv->buffer[priv->pos++];
915 }
916
917 /* ************************** */
918 /* Async stuff implementation */
919 /* ************************** */
920
921 static void
922 fill_async_callback (GObject      *source_object,
923                      GAsyncResult *result,
924                      gpointer      user_data)
925 {
926   GError *error;
927   gssize res;
928   GSimpleAsyncResult *simple;
929
930   simple = user_data;
931
932   error = NULL;
933   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
934                                     result, &error);
935
936   g_simple_async_result_set_op_res_gssize (simple, res);
937   if (res == -1)
938     {
939       g_simple_async_result_take_error (simple, error);
940     }
941   else
942     {
943       GBufferedInputStreamPrivate *priv;
944       GObject *object;
945
946       object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
947       priv = G_BUFFERED_INPUT_STREAM (object)->priv;
948
949       g_assert_cmpint (priv->end + res, <=, priv->len);
950       priv->end += res;
951
952       g_object_unref (object);
953     }
954
955   /* Complete immediately, not in idle, since we're already
956    * in a mainloop callout
957    */
958   g_simple_async_result_complete (simple);
959   g_object_unref (simple);
960 }
961
962 static void
963 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
964                                          gssize                count,
965                                          int                   io_priority,
966                                          GCancellable         *cancellable,
967                                          GAsyncReadyCallback   callback,
968                                          gpointer              user_data)
969 {
970   GBufferedInputStreamPrivate *priv;
971   GInputStream *base_stream;
972   GSimpleAsyncResult *simple;
973   gsize in_buffer;
974
975   priv = stream->priv;
976
977   if (count == -1)
978     count = priv->len;
979
980   in_buffer = priv->end - priv->pos;
981
982   /* Never fill more than can fit in the buffer */
983   count = MIN (count, priv->len - in_buffer);
984
985   /* If requested length does not fit at end, compact */
986   if (priv->len - priv->end < count)
987     compact_buffer (stream);
988
989   simple = g_simple_async_result_new (G_OBJECT (stream),
990                                       callback, user_data,
991                                       g_buffered_input_stream_real_fill_async);
992
993   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
994   g_input_stream_read_async (base_stream,
995                              priv->buffer + priv->end,
996                              count,
997                              io_priority,
998                              cancellable,
999                              fill_async_callback,
1000                              simple);
1001 }
1002
1003 static gssize
1004 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
1005                                           GAsyncResult         *result,
1006                                           GError              **error)
1007 {
1008   GSimpleAsyncResult *simple;
1009   gssize nread;
1010
1011   simple = G_SIMPLE_ASYNC_RESULT (result);
1012   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1013
1014   nread = g_simple_async_result_get_op_res_gssize (simple);
1015   return nread;
1016 }
1017
1018 typedef struct
1019 {
1020   gssize bytes_read;
1021   gssize count;
1022   void *buffer;
1023 } ReadAsyncData;
1024
1025 static void
1026 free_read_async_data (gpointer _data)
1027 {
1028   ReadAsyncData *data = _data;
1029   g_slice_free (ReadAsyncData, data);
1030 }
1031
1032 static void
1033 large_read_callback (GObject *source_object,
1034                      GAsyncResult *result,
1035                      gpointer user_data)
1036 {
1037   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1038   ReadAsyncData *data;
1039   GError *error;
1040   gssize nread;
1041
1042   data = g_simple_async_result_get_op_res_gpointer (simple);
1043
1044   error = NULL;
1045   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1046                                       result, &error);
1047
1048   /* Only report the error if we've not already read some data */
1049   if (nread < 0 && data->bytes_read == 0)
1050     g_simple_async_result_take_error (simple, error);
1051   else if (error)
1052     g_error_free (error);
1053
1054   if (nread > 0)
1055     data->bytes_read += nread;
1056
1057   /* Complete immediately, not in idle, since we're already
1058    * in a mainloop callout
1059    */
1060   g_simple_async_result_complete (simple);
1061   g_object_unref (simple);
1062 }
1063
1064 static void
1065 read_fill_buffer_callback (GObject      *source_object,
1066                            GAsyncResult *result,
1067                            gpointer      user_data)
1068 {
1069   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1070   GBufferedInputStream *bstream;
1071   GBufferedInputStreamPrivate *priv;
1072   ReadAsyncData *data;
1073   GError *error;
1074   gssize nread;
1075   gsize available;
1076
1077   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1078   priv = bstream->priv;
1079
1080   data = g_simple_async_result_get_op_res_gpointer (simple);
1081
1082   error = NULL;
1083   nread = g_buffered_input_stream_fill_finish (bstream,
1084                                                result, &error);
1085
1086   if (nread < 0 && data->bytes_read == 0)
1087     g_simple_async_result_take_error (simple, error);
1088   else if (error)
1089     g_error_free (error);
1090
1091   if (nread > 0)
1092     {
1093       available = priv->end - priv->pos;
1094       data->count = MIN (data->count, available);
1095
1096       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1097       data->bytes_read += data->count;
1098       priv->pos += data->count;
1099     }
1100
1101   /* Complete immediately, not in idle, since we're already
1102    * in a mainloop callout
1103    */
1104   g_simple_async_result_complete (simple);
1105   g_object_unref (simple);
1106 }
1107
1108 static void
1109 g_buffered_input_stream_read_async (GInputStream        *stream,
1110                                     void                *buffer,
1111                                     gsize                count,
1112                                     int                  io_priority,
1113                                     GCancellable        *cancellable,
1114                                     GAsyncReadyCallback  callback,
1115                                     gpointer             user_data)
1116 {
1117   GBufferedInputStream *bstream;
1118   GBufferedInputStreamPrivate *priv;
1119   GBufferedInputStreamClass *class;
1120   GInputStream *base_stream;
1121   gsize available;
1122   GSimpleAsyncResult *simple;
1123   ReadAsyncData *data;
1124
1125   bstream = G_BUFFERED_INPUT_STREAM (stream);
1126   priv = bstream->priv;
1127
1128   data = g_slice_new (ReadAsyncData);
1129   data->buffer = buffer;
1130   data->bytes_read = 0;
1131   simple = g_simple_async_result_new (G_OBJECT (stream),
1132                                       callback, user_data,
1133                                       g_buffered_input_stream_read_async);
1134   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1135
1136   available = priv->end - priv->pos;
1137
1138   if (count <= available)
1139     {
1140       memcpy (buffer, priv->buffer + priv->pos, count);
1141       priv->pos += count;
1142       data->bytes_read = count;
1143
1144       g_simple_async_result_complete_in_idle (simple);
1145       g_object_unref (simple);
1146       return;
1147     }
1148
1149
1150   /* Full request not available, read all currently available
1151    * and request refill for more
1152    */
1153
1154   memcpy (buffer, priv->buffer + priv->pos, available);
1155   priv->pos = 0;
1156   priv->end = 0;
1157
1158   count -= available;
1159
1160   data->bytes_read = available;
1161   data->count = count;
1162
1163   if (count > priv->len)
1164     {
1165       /* Large request, shortcut buffer */
1166
1167       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1168
1169       g_input_stream_read_async (base_stream,
1170                                  (char *)buffer + data->bytes_read,
1171                                  count,
1172                                  io_priority, cancellable,
1173                                  large_read_callback,
1174                                  simple);
1175     }
1176   else
1177     {
1178       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1179       class->fill_async (bstream, priv->len, io_priority, cancellable,
1180                          read_fill_buffer_callback, simple);
1181     }
1182 }
1183
1184 static gssize
1185 g_buffered_input_stream_read_finish (GInputStream   *stream,
1186                                      GAsyncResult   *result,
1187                                      GError        **error)
1188 {
1189   GSimpleAsyncResult *simple;
1190   ReadAsyncData *data;
1191
1192   simple = G_SIMPLE_ASYNC_RESULT (result);
1193
1194   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1195
1196   data = g_simple_async_result_get_op_res_gpointer (simple);
1197
1198   return data->bytes_read;
1199 }
1200
1201 typedef struct
1202 {
1203   gssize bytes_skipped;
1204   gssize count;
1205 } SkipAsyncData;
1206
1207 static void
1208 free_skip_async_data (gpointer _data)
1209 {
1210   SkipAsyncData *data = _data;
1211   g_slice_free (SkipAsyncData, data);
1212 }
1213
1214 static void
1215 large_skip_callback (GObject      *source_object,
1216                      GAsyncResult *result,
1217                      gpointer      user_data)
1218 {
1219   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1220   SkipAsyncData *data;
1221   GError *error;
1222   gssize nread;
1223
1224   data = g_simple_async_result_get_op_res_gpointer (simple);
1225
1226   error = NULL;
1227   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1228                                       result, &error);
1229
1230   /* Only report the error if we've not already read some data */
1231   if (nread < 0 && data->bytes_skipped == 0)
1232     g_simple_async_result_take_error (simple, error);
1233   else if (error)
1234     g_error_free (error);
1235
1236   if (nread > 0)
1237     data->bytes_skipped += nread;
1238
1239   /* Complete immediately, not in idle, since we're already
1240    * in a mainloop callout
1241    */
1242   g_simple_async_result_complete (simple);
1243   g_object_unref (simple);
1244 }
1245
1246 static void
1247 skip_fill_buffer_callback (GObject      *source_object,
1248                            GAsyncResult *result,
1249                            gpointer      user_data)
1250 {
1251   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1252   GBufferedInputStream *bstream;
1253   GBufferedInputStreamPrivate *priv;
1254   SkipAsyncData *data;
1255   GError *error;
1256   gssize nread;
1257   gsize available;
1258
1259   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1260   priv = bstream->priv;
1261
1262   data = g_simple_async_result_get_op_res_gpointer (simple);
1263
1264   error = NULL;
1265   nread = g_buffered_input_stream_fill_finish (bstream,
1266                                                result, &error);
1267
1268   if (nread < 0 && data->bytes_skipped == 0)
1269     g_simple_async_result_take_error (simple, error);
1270   else if (error)
1271     g_error_free (error);
1272
1273   if (nread > 0)
1274     {
1275       available = priv->end - priv->pos;
1276       data->count = MIN (data->count, available);
1277
1278       data->bytes_skipped += data->count;
1279       priv->pos += data->count;
1280     }
1281
1282   /* Complete immediately, not in idle, since we're already
1283    * in a mainloop callout
1284    */
1285   g_simple_async_result_complete (simple);
1286   g_object_unref (simple);
1287 }
1288
1289 static void
1290 g_buffered_input_stream_skip_async (GInputStream        *stream,
1291                                     gsize                count,
1292                                     int                  io_priority,
1293                                     GCancellable        *cancellable,
1294                                     GAsyncReadyCallback  callback,
1295                                     gpointer             user_data)
1296 {
1297   GBufferedInputStream *bstream;
1298   GBufferedInputStreamPrivate *priv;
1299   GBufferedInputStreamClass *class;
1300   GInputStream *base_stream;
1301   gsize available;
1302   GSimpleAsyncResult *simple;
1303   SkipAsyncData *data;
1304
1305   bstream = G_BUFFERED_INPUT_STREAM (stream);
1306   priv = bstream->priv;
1307
1308   data = g_slice_new (SkipAsyncData);
1309   data->bytes_skipped = 0;
1310   simple = g_simple_async_result_new (G_OBJECT (stream),
1311                                       callback, user_data,
1312                                       g_buffered_input_stream_skip_async);
1313   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1314
1315   available = priv->end - priv->pos;
1316
1317   if (count <= available)
1318     {
1319       priv->pos += count;
1320       data->bytes_skipped = count;
1321
1322       g_simple_async_result_complete_in_idle (simple);
1323       g_object_unref (simple);
1324       return;
1325     }
1326
1327   /* Full request not available, skip all currently available
1328    * and request refill for more
1329    */
1330
1331   priv->pos = 0;
1332   priv->end = 0;
1333
1334   count -= available;
1335
1336   data->bytes_skipped = available;
1337   data->count = count;
1338
1339   if (count > priv->len)
1340     {
1341       /* Large request, shortcut buffer */
1342
1343       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1344
1345       g_input_stream_skip_async (base_stream,
1346                                  count,
1347                                  io_priority, cancellable,
1348                                  large_skip_callback,
1349                                  simple);
1350     }
1351   else
1352     {
1353       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1354       class->fill_async (bstream, priv->len, io_priority, cancellable,
1355                          skip_fill_buffer_callback, simple);
1356     }
1357 }
1358
1359 static gssize
1360 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1361                                      GAsyncResult   *result,
1362                                      GError        **error)
1363 {
1364   GSimpleAsyncResult *simple;
1365   SkipAsyncData *data;
1366
1367   simple = G_SIMPLE_ASYNC_RESULT (result);
1368
1369   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1370
1371   data = g_simple_async_result_get_op_res_gpointer (simple);
1372
1373   return data->bytes_skipped;
1374 }