gio: Add g_async_result_legacy_propagate_error()
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org>
22  */
23
24 #include "config.h"
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gcancellable.h"
28 #include "gasyncresult.h"
29 #include "gsimpleasyncresult.h"
30 #include "gseekable.h"
31 #include "gioerror.h"
32 #include <string.h>
33 #include "glibintl.h"
34
35
36 /**
37  * SECTION:gbufferedinputstream
38  * @short_description: Buffered Input Stream
39  * @include: gio/gio.h
40  * @see_also: #GFilterInputStream, #GInputStream
41  *
42  * Buffered input stream implements #GFilterInputStream and provides
43  * for buffered reads.
44  *
45  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
46  *
47  * To create a buffered input stream, use g_buffered_input_stream_new(),
48  * or g_buffered_input_stream_new_sized() to specify the buffer's size at
49  * construction.
50  *
51  * To get the size of a buffer within a buffered input stream, use
52  * g_buffered_input_stream_get_buffer_size(). To change the size of a
53  * buffered input stream's buffer, use
54  * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
55  * cannot be reduced below the size of the data within the buffer.
56  */
57
58
59 #define DEFAULT_BUFFER_SIZE 4096
60
61 struct _GBufferedInputStreamPrivate {
62   guint8 *buffer;
63   gsize   len;
64   gsize   pos;
65   gsize   end;
66   GAsyncReadyCallback outstanding_callback;
67 };
68
69 enum {
70   PROP_0,
71   PROP_BUFSIZE
72 };
73
74 static void g_buffered_input_stream_set_property  (GObject      *object,
75                                                    guint         prop_id,
76                                                    const GValue *value,
77                                                    GParamSpec   *pspec);
78
79 static void g_buffered_input_stream_get_property  (GObject      *object,
80                                                    guint         prop_id,
81                                                    GValue       *value,
82                                                    GParamSpec   *pspec);
83 static void g_buffered_input_stream_finalize      (GObject *object);
84
85
86 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
87                                                         gsize                  count,
88                                                         GCancellable          *cancellable,
89                                                         GError               **error);
90 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
91                                                         gsize                  count,
92                                                         int                    io_priority,
93                                                         GCancellable          *cancellable,
94                                                         GAsyncReadyCallback    callback,
95                                                         gpointer               user_data);
96 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
97                                                         GAsyncResult          *result,
98                                                         GError               **error);
99 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
100                                                         void                  *buffer,
101                                                         gsize                  count,
102                                                         GCancellable          *cancellable,
103                                                         GError               **error);
104 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
105                                                         gssize                 count,
106                                                         GCancellable          *cancellable,
107                                                         GError               **error);
108 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
109                                                         gssize                 count,
110                                                         int                    io_priority,
111                                                         GCancellable          *cancellable,
112                                                         GAsyncReadyCallback    callback,
113                                                         gpointer               user_data);
114 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
115                                                         GAsyncResult          *result,
116                                                         GError               **error);
117
118 static void     g_buffered_input_stream_seekable_iface_init (GSeekableIface  *iface);
119 static goffset  g_buffered_input_stream_tell                (GSeekable       *seekable);
120 static gboolean g_buffered_input_stream_can_seek            (GSeekable       *seekable);
121 static gboolean g_buffered_input_stream_seek                (GSeekable       *seekable,
122                                                              goffset          offset,
123                                                              GSeekType        type,
124                                                              GCancellable    *cancellable,
125                                                              GError         **error);
126 static gboolean g_buffered_input_stream_can_truncate        (GSeekable       *seekable);
127 static gboolean g_buffered_input_stream_truncate            (GSeekable       *seekable,
128                                                              goffset          offset,
129                                                              GCancellable    *cancellable,
130                                                              GError         **error);
131
132 static void     g_buffered_input_stream_finalize            (GObject         *object);
133
134 static void compact_buffer (GBufferedInputStream *stream);
135
136 G_DEFINE_TYPE_WITH_CODE (GBufferedInputStream,
137                          g_buffered_input_stream,
138                          G_TYPE_FILTER_INPUT_STREAM,
139                          G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
140                                                 g_buffered_input_stream_seekable_iface_init))
141
142 static void
143 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
144 {
145   GObjectClass *object_class;
146   GInputStreamClass *istream_class;
147   GBufferedInputStreamClass *bstream_class;
148
149   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
150
151   object_class = G_OBJECT_CLASS (klass);
152   object_class->get_property = g_buffered_input_stream_get_property;
153   object_class->set_property = g_buffered_input_stream_set_property;
154   object_class->finalize     = g_buffered_input_stream_finalize;
155
156   istream_class = G_INPUT_STREAM_CLASS (klass);
157   istream_class->skip = g_buffered_input_stream_skip;
158   istream_class->skip_async  = g_buffered_input_stream_skip_async;
159   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
160   istream_class->read_fn = g_buffered_input_stream_read;
161
162   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
163   bstream_class->fill = g_buffered_input_stream_real_fill;
164   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
165   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
166
167   g_object_class_install_property (object_class,
168                                    PROP_BUFSIZE,
169                                    g_param_spec_uint ("buffer-size",
170                                                       P_("Buffer Size"),
171                                                       P_("The size of the backend buffer"),
172                                                       1,
173                                                       G_MAXUINT,
174                                                       DEFAULT_BUFFER_SIZE,
175                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
176                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
177
178
179 }
180
181 /**
182  * g_buffered_input_stream_get_buffer_size:
183  * @stream: a #GBufferedInputStream
184  *
185  * Gets the size of the input buffer.
186  *
187  * Returns: the current buffer size.
188  */
189 gsize
190 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
191 {
192   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
193
194   return stream->priv->len;
195 }
196
197 /**
198  * g_buffered_input_stream_set_buffer_size:
199  * @stream: a #GBufferedInputStream
200  * @size: a #gsize
201  *
202  * Sets the size of the internal buffer of @stream to @size, or to the
203  * size of the contents of the buffer. The buffer can never be resized
204  * smaller than its current contents.
205  */
206 void
207 g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream,
208                                          gsize                 size)
209 {
210   GBufferedInputStreamPrivate *priv;
211   gsize in_buffer;
212   guint8 *buffer;
213
214   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
215
216   priv = stream->priv;
217
218   if (priv->len == size)
219     return;
220
221   if (priv->buffer)
222     {
223       in_buffer = priv->end - priv->pos;
224
225       /* Never resize smaller than current buffer contents */
226       size = MAX (size, in_buffer);
227
228       buffer = g_malloc (size);
229       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
230       priv->len = size;
231       priv->pos = 0;
232       priv->end = in_buffer;
233       g_free (priv->buffer);
234       priv->buffer = buffer;
235     }
236   else
237     {
238       priv->len = size;
239       priv->pos = 0;
240       priv->end = 0;
241       priv->buffer = g_malloc (size);
242     }
243
244   g_object_notify (G_OBJECT (stream), "buffer-size");
245 }
246
247 static void
248 g_buffered_input_stream_set_property (GObject      *object,
249                                       guint         prop_id,
250                                       const GValue *value,
251                                       GParamSpec   *pspec)
252 {
253   GBufferedInputStream        *bstream;
254
255   bstream = G_BUFFERED_INPUT_STREAM (object);
256
257   switch (prop_id)
258     {
259     case PROP_BUFSIZE:
260       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
261       break;
262
263     default:
264       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
265       break;
266     }
267 }
268
269 static void
270 g_buffered_input_stream_get_property (GObject    *object,
271                                       guint       prop_id,
272                                       GValue     *value,
273                                       GParamSpec *pspec)
274 {
275   GBufferedInputStreamPrivate *priv;
276   GBufferedInputStream        *bstream;
277
278   bstream = G_BUFFERED_INPUT_STREAM (object);
279   priv = bstream->priv;
280
281   switch (prop_id)
282     {
283     case PROP_BUFSIZE:
284       g_value_set_uint (value, priv->len);
285       break;
286
287     default:
288       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
289       break;
290     }
291 }
292
293 static void
294 g_buffered_input_stream_finalize (GObject *object)
295 {
296   GBufferedInputStreamPrivate *priv;
297   GBufferedInputStream        *stream;
298
299   stream = G_BUFFERED_INPUT_STREAM (object);
300   priv = stream->priv;
301
302   g_free (priv->buffer);
303
304   G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
305 }
306
307 static void
308 g_buffered_input_stream_seekable_iface_init (GSeekableIface *iface)
309 {
310   iface->tell         = g_buffered_input_stream_tell;
311   iface->can_seek     = g_buffered_input_stream_can_seek;
312   iface->seek         = g_buffered_input_stream_seek;
313   iface->can_truncate = g_buffered_input_stream_can_truncate;
314   iface->truncate_fn  = g_buffered_input_stream_truncate;
315 }
316
317 static void
318 g_buffered_input_stream_init (GBufferedInputStream *stream)
319 {
320   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
321                                               G_TYPE_BUFFERED_INPUT_STREAM,
322                                               GBufferedInputStreamPrivate);
323 }
324
325
326 /**
327  * g_buffered_input_stream_new:
328  * @base_stream: a #GInputStream
329  *
330  * Creates a new #GInputStream from the given @base_stream, with
331  * a buffer set to the default size (4 kilobytes).
332  *
333  * Returns: a #GInputStream for the given @base_stream.
334  */
335 GInputStream *
336 g_buffered_input_stream_new (GInputStream *base_stream)
337 {
338   GInputStream *stream;
339
340   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
341
342   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
343                          "base-stream", base_stream,
344                          NULL);
345
346   return stream;
347 }
348
349 /**
350  * g_buffered_input_stream_new_sized:
351  * @base_stream: a #GInputStream
352  * @size: a #gsize
353  *
354  * Creates a new #GBufferedInputStream from the given @base_stream,
355  * with a buffer set to @size.
356  *
357  * Returns: a #GInputStream.
358  */
359 GInputStream *
360 g_buffered_input_stream_new_sized (GInputStream *base_stream,
361                                    gsize         size)
362 {
363   GInputStream *stream;
364
365   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
366
367   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
368                          "base-stream", base_stream,
369                          "buffer-size", (guint)size,
370                          NULL);
371
372   return stream;
373 }
374
375 /**
376  * g_buffered_input_stream_fill:
377  * @stream: a #GBufferedInputStream
378  * @count: the number of bytes that will be read from the stream
379  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore
380  * @error: location to store the error occurring, or %NULL to ignore
381  *
382  * Tries to read @count bytes from the stream into the buffer.
383  * Will block during this read.
384  *
385  * If @count is zero, returns zero and does nothing. A value of @count
386  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
387  *
388  * On success, the number of bytes read into the buffer is returned.
389  * It is not an error if this is not the same as the requested size, as it
390  * can happen e.g. near the end of a file. Zero is returned on end of file
391  * (or if @count is zero),  but never otherwise.
392  *
393  * If @count is -1 then the attempted read size is equal to the number of
394  * bytes that are required to fill the buffer.
395  *
396  * If @cancellable is not %NULL, then the operation can be cancelled by
397  * triggering the cancellable object from another thread. If the operation
398  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
399  * operation was partially finished when the operation was cancelled the
400  * partial result will be returned, without an error.
401  *
402  * On error -1 is returned and @error is set accordingly.
403  *
404  * For the asynchronous, non-blocking, version of this function, see
405  * g_buffered_input_stream_fill_async().
406  *
407  * Returns: the number of bytes read into @stream's buffer, up to @count,
408  *     or -1 on error.
409  */
410 gssize
411 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
412                               gssize                 count,
413                               GCancellable          *cancellable,
414                               GError               **error)
415 {
416   GBufferedInputStreamClass *class;
417   GInputStream *input_stream;
418   gssize res;
419
420   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
421
422   input_stream = G_INPUT_STREAM (stream);
423
424   if (count < -1)
425     {
426       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
427                    _("Too large count value passed to %s"), G_STRFUNC);
428       return -1;
429     }
430
431   if (!g_input_stream_set_pending (input_stream, error))
432     return -1;
433
434   if (cancellable)
435     g_cancellable_push_current (cancellable);
436
437   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
438   res = class->fill (stream, count, cancellable, error);
439
440   if (cancellable)
441     g_cancellable_pop_current (cancellable);
442
443   g_input_stream_clear_pending (input_stream);
444
445   return res;
446 }
447
448 static void
449 async_fill_callback_wrapper (GObject      *source_object,
450                              GAsyncResult *res,
451                              gpointer      user_data)
452 {
453   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
454
455   g_input_stream_clear_pending (G_INPUT_STREAM (stream));
456   (*stream->priv->outstanding_callback) (source_object, res, user_data);
457   g_object_unref (stream);
458 }
459
460 /**
461  * g_buffered_input_stream_fill_async:
462  * @stream: a #GBufferedInputStream
463  * @count: the number of bytes that will be read from the stream
464  * @io_priority: the <link linkend="io-priority">I/O priority</link>
465  *     of the request
466  * @cancellable: (allow-none): optional #GCancellable object
467  * @callback: (scope async): a #GAsyncReadyCallback
468  * @user_data: (closure): a #gpointer
469  *
470  * Reads data into @stream's buffer asynchronously, up to @count size.
471  * @io_priority can be used to prioritize reads. For the synchronous
472  * version of this function, see g_buffered_input_stream_fill().
473  *
474  * If @count is -1 then the attempted read size is equal to the number
475  * of bytes that are required to fill the buffer.
476  */
477 void
478 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
479                                     gssize                count,
480                                     int                   io_priority,
481                                     GCancellable         *cancellable,
482                                     GAsyncReadyCallback   callback,
483                                     gpointer              user_data)
484 {
485   GBufferedInputStreamClass *class;
486   GSimpleAsyncResult *simple;
487   GError *error = NULL;
488
489   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
490
491   if (count == 0)
492     {
493       simple = g_simple_async_result_new (G_OBJECT (stream),
494                                           callback,
495                                           user_data,
496                                           g_buffered_input_stream_fill_async);
497       g_simple_async_result_complete_in_idle (simple);
498       g_object_unref (simple);
499       return;
500     }
501
502   if (count < -1)
503     {
504       g_simple_async_report_error_in_idle (G_OBJECT (stream),
505                                            callback,
506                                            user_data,
507                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
508                                            _("Too large count value passed to %s"),
509                                            G_STRFUNC);
510       return;
511     }
512
513   if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
514     {
515       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
516                                             callback,
517                                             user_data,
518                                             error);
519       return;
520     }
521
522   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
523
524   stream->priv->outstanding_callback = callback;
525   g_object_ref (stream);
526   class->fill_async (stream, count, io_priority, cancellable,
527                      async_fill_callback_wrapper, user_data);
528 }
529
530 /**
531  * g_buffered_input_stream_fill_finish:
532  * @stream: a #GBufferedInputStream
533  * @result: a #GAsyncResult
534  * @error: a #GError
535  *
536  * Finishes an asynchronous read.
537  *
538  * Returns: a #gssize of the read stream, or %-1 on an error.
539  */
540 gssize
541 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
542                                      GAsyncResult          *result,
543                                      GError               **error)
544 {
545   GSimpleAsyncResult *simple;
546   GBufferedInputStreamClass *class;
547
548   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
549   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
550
551   if (g_async_result_legacy_propagate_error (result, error))
552     return -1;
553
554   if (G_IS_SIMPLE_ASYNC_RESULT (result))
555     {
556       simple = G_SIMPLE_ASYNC_RESULT (result);
557
558       /* Special case read of 0 bytes */
559       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
560         return 0;
561     }
562
563   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
564   return class->fill_finish (stream, result, error);
565 }
566
567 /**
568  * g_buffered_input_stream_get_available:
569  * @stream: #GBufferedInputStream
570  *
571  * Gets the size of the available data within the stream.
572  *
573  * Returns: size of the available stream.
574  */
575 gsize
576 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
577 {
578   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
579
580   return stream->priv->end - stream->priv->pos;
581 }
582
583 /**
584  * g_buffered_input_stream_peek:
585  * @stream: a #GBufferedInputStream
586  * @buffer: (array length=count) (element-type guint8): a pointer to
587  *   an allocated chunk of memory
588  * @offset: a #gsize
589  * @count: a #gsize
590  *
591  * Peeks in the buffer, copying data of size @count into @buffer,
592  * offset @offset bytes.
593  *
594  * Returns: a #gsize of the number of bytes peeked, or -1 on error.
595  */
596 gsize
597 g_buffered_input_stream_peek (GBufferedInputStream *stream,
598                               void                 *buffer,
599                               gsize                 offset,
600                               gsize                 count)
601 {
602   gsize available;
603   gsize end;
604
605   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
606   g_return_val_if_fail (buffer != NULL, -1);
607
608   available = g_buffered_input_stream_get_available (stream);
609
610   if (offset > available)
611     return 0;
612
613   end = MIN (offset + count, available);
614   count = end - offset;
615
616   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
617   return count;
618 }
619
620 /**
621  * g_buffered_input_stream_peek_buffer:
622  * @stream: a #GBufferedInputStream
623  * @count: (out): a #gsize to get the number of bytes available in the buffer
624  *
625  * Returns the buffer with the currently available bytes. The returned
626  * buffer must not be modified and will become invalid when reading from
627  * the stream or filling the buffer.
628  *
629  * Returns: (array length=count) (element-type guint8) (transfer none):
630  *          read-only buffer
631  */
632 const void*
633 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
634                                      gsize                *count)
635 {
636   GBufferedInputStreamPrivate *priv;
637
638   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
639
640   priv = stream->priv;
641
642   if (count)
643     *count = priv->end - priv->pos;
644
645   return priv->buffer + priv->pos;
646 }
647
648 static void
649 compact_buffer (GBufferedInputStream *stream)
650 {
651   GBufferedInputStreamPrivate *priv;
652   gsize current_size;
653
654   priv = stream->priv;
655
656   current_size = priv->end - priv->pos;
657
658   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
659
660   priv->pos = 0;
661   priv->end = current_size;
662 }
663
664 static gssize
665 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
666                                    gssize                 count,
667                                    GCancellable          *cancellable,
668                                    GError               **error)
669 {
670   GBufferedInputStreamPrivate *priv;
671   GInputStream *base_stream;
672   gssize nread;
673   gsize in_buffer;
674
675   priv = stream->priv;
676
677   if (count == -1)
678     count = priv->len;
679
680   in_buffer = priv->end - priv->pos;
681
682   /* Never fill more than can fit in the buffer */
683   count = MIN (count, priv->len - in_buffer);
684
685   /* If requested length does not fit at end, compact */
686   if (priv->len - priv->end < count)
687     compact_buffer (stream);
688
689   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
690   nread = g_input_stream_read (base_stream,
691                                priv->buffer + priv->end,
692                                count,
693                                cancellable,
694                                error);
695
696   if (nread > 0)
697     priv->end += nread;
698
699   return nread;
700 }
701
702 static gssize
703 g_buffered_input_stream_skip (GInputStream  *stream,
704                               gsize          count,
705                               GCancellable  *cancellable,
706                               GError       **error)
707 {
708   GBufferedInputStream        *bstream;
709   GBufferedInputStreamPrivate *priv;
710   GBufferedInputStreamClass *class;
711   GInputStream *base_stream;
712   gsize available, bytes_skipped;
713   gssize nread;
714
715   bstream = G_BUFFERED_INPUT_STREAM (stream);
716   priv = bstream->priv;
717
718   available = priv->end - priv->pos;
719
720   if (count <= available)
721     {
722       priv->pos += count;
723       return count;
724     }
725
726   /* Full request not available, skip all currently available and
727    * request refill for more
728    */
729
730   priv->pos = 0;
731   priv->end = 0;
732   bytes_skipped = available;
733   count -= available;
734
735   if (bytes_skipped > 0)
736     error = NULL; /* Ignore further errors if we already read some data */
737
738   if (count > priv->len)
739     {
740       /* Large request, shortcut buffer */
741
742       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
743
744       nread = g_input_stream_skip (base_stream,
745                                    count,
746                                    cancellable,
747                                    error);
748
749       if (nread < 0 && bytes_skipped == 0)
750         return -1;
751
752       if (nread > 0)
753         bytes_skipped += nread;
754
755       return bytes_skipped;
756     }
757
758   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
759   nread = class->fill (bstream, priv->len, cancellable, error);
760
761   if (nread < 0)
762     {
763       if (bytes_skipped == 0)
764         return -1;
765       else
766         return bytes_skipped;
767     }
768
769   available = priv->end - priv->pos;
770   count = MIN (count, available);
771
772   bytes_skipped += count;
773   priv->pos += count;
774
775   return bytes_skipped;
776 }
777
778 static gssize
779 g_buffered_input_stream_read (GInputStream *stream,
780                               void         *buffer,
781                               gsize         count,
782                               GCancellable *cancellable,
783                               GError      **error)
784 {
785   GBufferedInputStream        *bstream;
786   GBufferedInputStreamPrivate *priv;
787   GBufferedInputStreamClass *class;
788   GInputStream *base_stream;
789   gsize available, bytes_read;
790   gssize nread;
791
792   bstream = G_BUFFERED_INPUT_STREAM (stream);
793   priv = bstream->priv;
794
795   available = priv->end - priv->pos;
796
797   if (count <= available)
798     {
799       memcpy (buffer, priv->buffer + priv->pos, count);
800       priv->pos += count;
801       return count;
802     }
803
804   /* Full request not available, read all currently available and
805    * request refill for more
806    */
807
808   memcpy (buffer, priv->buffer + priv->pos, available);
809   priv->pos = 0;
810   priv->end = 0;
811   bytes_read = available;
812   count -= available;
813
814   if (bytes_read > 0)
815     error = NULL; /* Ignore further errors if we already read some data */
816
817   if (count > priv->len)
818     {
819       /* Large request, shortcut buffer */
820
821       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
822
823       nread = g_input_stream_read (base_stream,
824                                    (char *)buffer + bytes_read,
825                                    count,
826                                    cancellable,
827                                    error);
828
829       if (nread < 0 && bytes_read == 0)
830         return -1;
831
832       if (nread > 0)
833         bytes_read += nread;
834
835       return bytes_read;
836     }
837
838   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
839   nread = class->fill (bstream, priv->len, cancellable, error);
840   if (nread < 0)
841     {
842       if (bytes_read == 0)
843         return -1;
844       else
845         return bytes_read;
846     }
847
848   available = priv->end - priv->pos;
849   count = MIN (count, available);
850
851   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
852   bytes_read += count;
853   priv->pos += count;
854
855   return bytes_read;
856 }
857
858 static goffset
859 g_buffered_input_stream_tell (GSeekable *seekable)
860 {
861   GBufferedInputStream        *bstream;
862   GBufferedInputStreamPrivate *priv;
863   GInputStream *base_stream;
864   GSeekable    *base_stream_seekable;
865   gsize available;
866   goffset base_offset;
867   
868   bstream = G_BUFFERED_INPUT_STREAM (seekable);
869   priv = bstream->priv;
870
871   base_stream = G_FILTER_INPUT_STREAM (seekable)->base_stream;
872   if (!G_IS_SEEKABLE (base_stream))
873     return 0;
874   base_stream_seekable = G_SEEKABLE (base_stream);
875   
876   available = priv->end - priv->pos;
877   base_offset = g_seekable_tell (base_stream_seekable);
878
879   return base_offset - available;
880 }
881
882 static gboolean
883 g_buffered_input_stream_can_seek (GSeekable *seekable)
884 {
885   GInputStream *base_stream;
886   
887   base_stream = G_FILTER_INPUT_STREAM (seekable)->base_stream;
888   return G_IS_SEEKABLE (base_stream) && g_seekable_can_seek (G_SEEKABLE (base_stream));
889 }
890
891 static gboolean
892 g_buffered_input_stream_seek (GSeekable     *seekable,
893                               goffset        offset,
894                               GSeekType      type,
895                               GCancellable  *cancellable,
896                               GError       **error)
897 {
898   GBufferedInputStream        *bstream;
899   GBufferedInputStreamPrivate *priv;
900   GInputStream *base_stream;
901   GSeekable *base_stream_seekable;
902
903   bstream = G_BUFFERED_INPUT_STREAM (seekable);
904   priv = bstream->priv;
905
906   base_stream = G_FILTER_INPUT_STREAM (seekable)->base_stream;
907   if (!G_IS_SEEKABLE (base_stream))
908     {
909       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
910                            _("Seek not supported on base stream"));
911       return FALSE;
912     }
913
914   base_stream_seekable = G_SEEKABLE (base_stream);
915   
916   if (type == G_SEEK_CUR)
917     {
918       if (offset <= priv->end - priv->pos && offset >= -priv->pos)
919         {
920           priv->pos += offset;
921           return TRUE;
922         }
923       else
924         {
925           offset -= priv->end - priv->pos;
926         }
927     }
928
929   if (g_seekable_seek (base_stream_seekable, offset, type, cancellable, error))
930     {
931       priv->pos = 0;
932       priv->end = 0;
933       return TRUE;
934     }
935   else
936     {
937       return FALSE;
938     }
939 }
940
941 static gboolean
942 g_buffered_input_stream_can_truncate (GSeekable *seekable)
943 {
944   return FALSE;
945 }
946
947 static gboolean
948 g_buffered_input_stream_truncate (GSeekable     *seekable,
949                                   goffset        offset,
950                                   GCancellable  *cancellable,
951                                   GError       **error)
952 {
953   g_set_error_literal (error,
954                        G_IO_ERROR,
955                        G_IO_ERROR_NOT_SUPPORTED,
956                        _("Cannot truncate GBufferedInputStream"));
957   return FALSE;
958 }
959
960 /**
961  * g_buffered_input_stream_read_byte:
962  * @stream: a #GBufferedInputStream
963  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore
964  * @error: location to store the error occurring, or %NULL to ignore
965  *
966  * Tries to read a single byte from the stream or the buffer. Will block
967  * during this read.
968  *
969  * On success, the byte read from the stream is returned. On end of stream
970  * -1 is returned but it's not an exceptional error and @error is not set.
971  *
972  * If @cancellable is not %NULL, then the operation can be cancelled by
973  * triggering the cancellable object from another thread. If the operation
974  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
975  * operation was partially finished when the operation was cancelled the
976  * partial result will be returned, without an error.
977  *
978  * On error -1 is returned and @error is set accordingly.
979  *
980  * Returns: the byte read from the @stream, or -1 on end of stream or error.
981  */
982 int
983 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
984                                    GCancellable          *cancellable,
985                                    GError               **error)
986 {
987   GBufferedInputStreamPrivate *priv;
988   GBufferedInputStreamClass *class;
989   GInputStream *input_stream;
990   gsize available;
991   gssize nread;
992
993   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
994
995   priv = stream->priv;
996   input_stream = G_INPUT_STREAM (stream);
997
998   if (g_input_stream_is_closed (input_stream))
999     {
1000       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1001                            _("Stream is already closed"));
1002       return -1;
1003     }
1004
1005   if (!g_input_stream_set_pending (input_stream, error))
1006     return -1;
1007
1008   available = priv->end - priv->pos;
1009
1010   if (available != 0)
1011     {
1012       g_input_stream_clear_pending (input_stream);
1013       return priv->buffer[priv->pos++];
1014     }
1015
1016   /* Byte not available, request refill for more */
1017
1018   if (cancellable)
1019     g_cancellable_push_current (cancellable);
1020
1021   priv->pos = 0;
1022   priv->end = 0;
1023
1024   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1025   nread = class->fill (stream, priv->len, cancellable, error);
1026
1027   if (cancellable)
1028     g_cancellable_pop_current (cancellable);
1029
1030   g_input_stream_clear_pending (input_stream);
1031
1032   if (nread <= 0)
1033     return -1; /* error or end of stream */
1034
1035   return priv->buffer[priv->pos++];
1036 }
1037
1038 /* ************************** */
1039 /* Async stuff implementation */
1040 /* ************************** */
1041
1042 static void
1043 fill_async_callback (GObject      *source_object,
1044                      GAsyncResult *result,
1045                      gpointer      user_data)
1046 {
1047   GError *error;
1048   gssize res;
1049   GSimpleAsyncResult *simple;
1050
1051   simple = user_data;
1052
1053   error = NULL;
1054   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1055                                     result, &error);
1056
1057   g_simple_async_result_set_op_res_gssize (simple, res);
1058   if (res == -1)
1059     {
1060       g_simple_async_result_take_error (simple, error);
1061     }
1062   else
1063     {
1064       GBufferedInputStreamPrivate *priv;
1065       GObject *object;
1066
1067       object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
1068       priv = G_BUFFERED_INPUT_STREAM (object)->priv;
1069
1070       g_assert_cmpint (priv->end + res, <=, priv->len);
1071       priv->end += res;
1072
1073       g_object_unref (object);
1074     }
1075
1076   /* Complete immediately, not in idle, since we're already
1077    * in a mainloop callout
1078    */
1079   g_simple_async_result_complete (simple);
1080   g_object_unref (simple);
1081 }
1082
1083 static void
1084 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
1085                                          gssize                count,
1086                                          int                   io_priority,
1087                                          GCancellable         *cancellable,
1088                                          GAsyncReadyCallback   callback,
1089                                          gpointer              user_data)
1090 {
1091   GBufferedInputStreamPrivate *priv;
1092   GInputStream *base_stream;
1093   GSimpleAsyncResult *simple;
1094   gsize in_buffer;
1095
1096   priv = stream->priv;
1097
1098   if (count == -1)
1099     count = priv->len;
1100
1101   in_buffer = priv->end - priv->pos;
1102
1103   /* Never fill more than can fit in the buffer */
1104   count = MIN (count, priv->len - in_buffer);
1105
1106   /* If requested length does not fit at end, compact */
1107   if (priv->len - priv->end < count)
1108     compact_buffer (stream);
1109
1110   simple = g_simple_async_result_new (G_OBJECT (stream),
1111                                       callback, user_data,
1112                                       g_buffered_input_stream_real_fill_async);
1113
1114   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1115   g_input_stream_read_async (base_stream,
1116                              priv->buffer + priv->end,
1117                              count,
1118                              io_priority,
1119                              cancellable,
1120                              fill_async_callback,
1121                              simple);
1122 }
1123
1124 static gssize
1125 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
1126                                           GAsyncResult         *result,
1127                                           GError              **error)
1128 {
1129   GSimpleAsyncResult *simple;
1130   gssize nread;
1131
1132   simple = G_SIMPLE_ASYNC_RESULT (result);
1133   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1134
1135   if (g_simple_async_result_propagate_error (simple, error))
1136     return -1;
1137
1138   nread = g_simple_async_result_get_op_res_gssize (simple);
1139   return nread;
1140 }
1141
1142 typedef struct
1143 {
1144   gssize bytes_skipped;
1145   gssize count;
1146 } SkipAsyncData;
1147
1148 static void
1149 free_skip_async_data (gpointer _data)
1150 {
1151   SkipAsyncData *data = _data;
1152   g_slice_free (SkipAsyncData, data);
1153 }
1154
1155 static void
1156 large_skip_callback (GObject      *source_object,
1157                      GAsyncResult *result,
1158                      gpointer      user_data)
1159 {
1160   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1161   SkipAsyncData *data;
1162   GError *error;
1163   gssize nread;
1164
1165   data = g_simple_async_result_get_op_res_gpointer (simple);
1166
1167   error = NULL;
1168   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1169                                       result, &error);
1170
1171   /* Only report the error if we've not already read some data */
1172   if (nread < 0 && data->bytes_skipped == 0)
1173     g_simple_async_result_take_error (simple, error);
1174   else if (error)
1175     g_error_free (error);
1176
1177   if (nread > 0)
1178     data->bytes_skipped += nread;
1179
1180   /* Complete immediately, not in idle, since we're already
1181    * in a mainloop callout
1182    */
1183   g_simple_async_result_complete (simple);
1184   g_object_unref (simple);
1185 }
1186
1187 static void
1188 skip_fill_buffer_callback (GObject      *source_object,
1189                            GAsyncResult *result,
1190                            gpointer      user_data)
1191 {
1192   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1193   GBufferedInputStream *bstream;
1194   GBufferedInputStreamPrivate *priv;
1195   SkipAsyncData *data;
1196   GError *error;
1197   gssize nread;
1198   gsize available;
1199
1200   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1201   priv = bstream->priv;
1202
1203   data = g_simple_async_result_get_op_res_gpointer (simple);
1204
1205   error = NULL;
1206   nread = g_buffered_input_stream_fill_finish (bstream,
1207                                                result, &error);
1208
1209   if (nread < 0 && data->bytes_skipped == 0)
1210     g_simple_async_result_take_error (simple, error);
1211   else if (error)
1212     g_error_free (error);
1213
1214   if (nread > 0)
1215     {
1216       available = priv->end - priv->pos;
1217       data->count = MIN (data->count, available);
1218
1219       data->bytes_skipped += data->count;
1220       priv->pos += data->count;
1221     }
1222
1223   /* Complete immediately, not in idle, since we're already
1224    * in a mainloop callout
1225    */
1226   g_simple_async_result_complete (simple);
1227   g_object_unref (simple);
1228 }
1229
1230 static void
1231 g_buffered_input_stream_skip_async (GInputStream        *stream,
1232                                     gsize                count,
1233                                     int                  io_priority,
1234                                     GCancellable        *cancellable,
1235                                     GAsyncReadyCallback  callback,
1236                                     gpointer             user_data)
1237 {
1238   GBufferedInputStream *bstream;
1239   GBufferedInputStreamPrivate *priv;
1240   GBufferedInputStreamClass *class;
1241   GInputStream *base_stream;
1242   gsize available;
1243   GSimpleAsyncResult *simple;
1244   SkipAsyncData *data;
1245
1246   bstream = G_BUFFERED_INPUT_STREAM (stream);
1247   priv = bstream->priv;
1248
1249   data = g_slice_new (SkipAsyncData);
1250   data->bytes_skipped = 0;
1251   simple = g_simple_async_result_new (G_OBJECT (stream),
1252                                       callback, user_data,
1253                                       g_buffered_input_stream_skip_async);
1254   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1255
1256   available = priv->end - priv->pos;
1257
1258   if (count <= available)
1259     {
1260       priv->pos += count;
1261       data->bytes_skipped = count;
1262
1263       g_simple_async_result_complete_in_idle (simple);
1264       g_object_unref (simple);
1265       return;
1266     }
1267
1268   /* Full request not available, skip all currently available
1269    * and request refill for more
1270    */
1271
1272   priv->pos = 0;
1273   priv->end = 0;
1274
1275   count -= available;
1276
1277   data->bytes_skipped = available;
1278   data->count = count;
1279
1280   if (count > priv->len)
1281     {
1282       /* Large request, shortcut buffer */
1283
1284       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1285
1286       g_input_stream_skip_async (base_stream,
1287                                  count,
1288                                  io_priority, cancellable,
1289                                  large_skip_callback,
1290                                  simple);
1291     }
1292   else
1293     {
1294       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1295       class->fill_async (bstream, priv->len, io_priority, cancellable,
1296                          skip_fill_buffer_callback, simple);
1297     }
1298 }
1299
1300 static gssize
1301 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1302                                      GAsyncResult   *result,
1303                                      GError        **error)
1304 {
1305   GSimpleAsyncResult *simple;
1306   SkipAsyncData *data;
1307
1308   simple = G_SIMPLE_ASYNC_RESULT (result);
1309
1310   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1311
1312   data = g_simple_async_result_get_op_res_gpointer (simple);
1313
1314   return data->bytes_skipped;
1315 }