Some more property work
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org> 
22  */
23
24 #include <config.h>
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gsimpleasyncresult.h"
28 #include <string.h>
29 #include "glibintl.h"
30
31 #include "gioalias.h"
32
33 /**
34  * SECTION:gbufferedinputstream
35  * @short_description: Buffered Input Stream
36  * @see_also: #GFilterInputStream, #GInputStream
37  * 
38  * Buffered input stream implements #GFilterInputStream and provides 
39  * for buffered reads. 
40  * 
41  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
42  * 
43  * To create a buffered input stream, use g_buffered_input_stream_new(), 
44  * or g_buffered_input_stream_new_sized() to specify the buffer's size at 
45  * construction.
46  * 
47  * To get the size of a buffer within a buffered input stream, use 
48  * g_buffered_input_stream_get_buffer_size(). To change the size of a 
49  * buffered input stream's buffer, use
50  * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size 
51  * cannot be reduced below the size of the data within the buffer.
52  *
53  **/
54
55
56
57 #define DEFAULT_BUFFER_SIZE 4096
58
59 struct _GBufferedInputStreamPrivate {
60   guint8 *buffer;
61   gsize   len;
62   gsize   pos;
63   gsize   end;
64   GAsyncReadyCallback outstanding_callback;
65 };
66
67 enum {
68   PROP_0,
69   PROP_BUFSIZE
70 };
71
72 static void g_buffered_input_stream_set_property  (GObject      *object,
73                                                    guint         prop_id,
74                                                    const GValue *value,
75                                                    GParamSpec   *pspec);
76
77 static void g_buffered_input_stream_get_property  (GObject      *object,
78                                                    guint         prop_id,
79                                                    GValue       *value,
80                                                    GParamSpec   *pspec);
81 static void g_buffered_input_stream_finalize      (GObject *object);
82
83
84 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
85                                                         gsize                  count,
86                                                         GCancellable          *cancellable,
87                                                         GError               **error);
88 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
89                                                         gsize                  count,
90                                                         int                    io_priority,
91                                                         GCancellable          *cancellable,
92                                                         GAsyncReadyCallback    callback,
93                                                         gpointer               user_data);
94 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
95                                                         GAsyncResult          *result,
96                                                         GError               **error);
97 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
98                                                         void                  *buffer,
99                                                         gsize                  count,
100                                                         GCancellable          *cancellable,
101                                                         GError               **error);
102 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
103                                                         void                  *buffer,
104                                                         gsize                  count,
105                                                         int                    io_priority,
106                                                         GCancellable          *cancellable,
107                                                         GAsyncReadyCallback    callback,
108                                                         gpointer               user_data);
109 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
110                                                         GAsyncResult          *result,
111                                                         GError               **error);
112 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
113                                                         gssize                 count,
114                                                         GCancellable          *cancellable,
115                                                         GError               **error);
116 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
117                                                         gssize                 count,
118                                                         int                    io_priority,
119                                                         GCancellable          *cancellable,
120                                                         GAsyncReadyCallback    callback,
121                                                         gpointer               user_data);
122 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
123                                                         GAsyncResult          *result,
124                                                         GError               **error);
125
126 static void compact_buffer (GBufferedInputStream *stream);
127
128 G_DEFINE_TYPE (GBufferedInputStream,
129                g_buffered_input_stream,
130                G_TYPE_FILTER_INPUT_STREAM)
131
132
133 static void
134 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
135 {
136   GObjectClass *object_class;
137   GInputStreamClass *istream_class;
138   GBufferedInputStreamClass *bstream_class;
139
140   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
141
142   object_class = G_OBJECT_CLASS (klass);
143   object_class->get_property = g_buffered_input_stream_get_property;
144   object_class->set_property = g_buffered_input_stream_set_property;
145   object_class->finalize     = g_buffered_input_stream_finalize;
146
147   istream_class = G_INPUT_STREAM_CLASS (klass);
148   istream_class->skip = g_buffered_input_stream_skip;
149   istream_class->skip_async  = g_buffered_input_stream_skip_async;
150   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
151   istream_class->read = g_buffered_input_stream_read;
152   istream_class->read_async  = g_buffered_input_stream_read_async;
153   istream_class->read_finish = g_buffered_input_stream_read_finish;
154
155   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
156   bstream_class->fill = g_buffered_input_stream_real_fill;
157   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
158   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
159   
160   g_object_class_install_property (object_class,
161                                    PROP_BUFSIZE,
162                                    g_param_spec_uint ("buffer-size",
163                                                       P_("Buffer Size"),
164                                                       P_("The size of the backend buffer"),
165                                                       1,
166                                                       G_MAXUINT,
167                                                       DEFAULT_BUFFER_SIZE,
168                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
169                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
170
171
172 }
173
174 /**
175  * g_buffered_input_stream_get_buffer_size:
176  * @stream: #GBufferedInputStream.
177  * 
178  * Gets the size of the input buffer.
179  * 
180  * Returns: the current buffer size, or %-1 on error.
181  **/
182 gsize
183 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
184 {
185   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
186
187   return stream->priv->len;
188 }
189
190 /**
191  * g_buffered_input_stream_set_buffer_size:
192  * @stream: #GBufferedInputStream.
193  * @size: a #gsize.
194  *
195  * Sets the size of the internal buffer of @stream to @size, or to the 
196  * size of the contents of the buffer. The buffer can never be resized 
197  * smaller than its current contents.
198  **/
199 void
200 g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
201                                          gsize                  size)
202 {
203   GBufferedInputStreamPrivate *priv;
204   gsize in_buffer;
205   guint8 *buffer;
206   
207   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
208
209   priv = stream->priv;
210
211   if (priv->len == size)
212     return;
213
214   if (priv->buffer)
215     {
216       in_buffer = priv->end - priv->pos;
217
218       /* Never resize smaller than current buffer contents */
219       size = MAX (size, in_buffer);
220
221       buffer = g_malloc (size);
222       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
223       priv->len = size;
224       priv->pos = 0;
225       priv->end = in_buffer;
226       g_free (priv->buffer);
227       priv->buffer = buffer;
228     }
229   else
230     {
231       priv->len = size;
232       priv->pos = 0;
233       priv->end = 0;
234       priv->buffer = g_malloc (size);
235     }
236
237   g_object_notify (G_OBJECT (stream), "buffer-size");
238 }
239
240 static void
241 g_buffered_input_stream_set_property (GObject      *object,
242                                       guint         prop_id,
243                                       const GValue *value,
244                                       GParamSpec   *pspec)
245 {
246   GBufferedInputStreamPrivate *priv;
247   GBufferedInputStream        *bstream;
248
249   bstream = G_BUFFERED_INPUT_STREAM (object);
250   priv = bstream->priv;
251
252   switch (prop_id) 
253     {
254     case PROP_BUFSIZE:
255       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
256       break;
257
258     default:
259       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
260       break;
261     }
262
263 }
264
265 static void
266 g_buffered_input_stream_get_property (GObject    *object,
267                                       guint       prop_id,
268                                       GValue     *value,
269                                       GParamSpec *pspec)
270 {
271   GBufferedInputStreamPrivate *priv;
272   GBufferedInputStream        *bstream;
273
274   bstream = G_BUFFERED_INPUT_STREAM (object);
275   priv = bstream->priv;
276
277   switch (prop_id)
278     { 
279     case PROP_BUFSIZE:
280       g_value_set_uint (value, priv->len);
281       break;
282
283     default:
284       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
285       break;
286     }
287 }
288
289 static void
290 g_buffered_input_stream_finalize (GObject *object)
291 {
292   GBufferedInputStreamPrivate *priv;
293   GBufferedInputStream        *stream;
294
295   stream = G_BUFFERED_INPUT_STREAM (object);
296   priv = stream->priv;
297
298   g_free (priv->buffer);
299
300   if (G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize)
301     (*G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) (object);
302 }
303
304 static void
305 g_buffered_input_stream_init (GBufferedInputStream *stream)
306 {
307   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
308                                               G_TYPE_BUFFERED_INPUT_STREAM,
309                                               GBufferedInputStreamPrivate);
310 }
311
312
313 /**
314  * g_buffered_input_stream_new:
315  * @base_stream: a #GInputStream.
316  * 
317  * Creates a new #GInputStream from the given @base_stream, with 
318  * a buffer set to the default size (4 kilobytes).
319  *
320  * Returns: a #GInputStream for the given @base_stream.
321  **/
322 GInputStream *
323 g_buffered_input_stream_new (GInputStream *base_stream)
324 {
325   GInputStream *stream;
326
327   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
328
329   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
330                          "base-stream", base_stream,
331                          NULL);
332
333   return stream;
334 }
335
336 /**
337  * g_buffered_input_stream_new_sized:
338  * @base_stream: a #GOutputStream.
339  * @size: a #gsize.
340  * 
341  * Creates a new #GBufferedInputStream from the given @base_stream, 
342  * with a buffer set to @size.
343  *
344  * Returns: a #GInputStream.
345  **/
346 GInputStream *
347 g_buffered_input_stream_new_sized (GInputStream *base_stream,
348                                    gsize         size)
349 {
350   GInputStream *stream;
351
352   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
353
354   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
355                          "base-stream", base_stream,
356                          "buffer-size", (guint)size,
357                          NULL);
358
359   return stream;
360 }
361
362 /**
363  * g_buffered_input_stream_fill:
364  * @stream: #GBufferedInputStream.
365  * @count: the number of bytes that will be read from the stream.
366  * @cancellable: optional #GCancellable object, %NULL to ignore.
367  * @error: location to store the error occuring, or %NULL to ignore.
368  *
369  * Tries to read @count bytes from the stream into the buffer. 
370  * Will block during this read.
371  * 
372  * If @count is zero, returns zero and does nothing. A value of @count
373  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
374  *
375  * On success, the number of bytes read into the buffer is returned.
376  * It is not an error if this is not the same as the requested size, as it
377  * can happen e.g. near the end of a file. Zero is returned on end of file
378  * (or if @count is zero),  but never otherwise.
379  *
380  * If @cancellable is not %NULL, then the operation can be cancelled by
381  * triggering the cancellable object from another thread. If the operation
382  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
383  * operation was partially finished when the operation was cancelled the
384  * partial result will be returned, without an error.
385  *
386  * On error -1 is returned and @error is set accordingly.
387  * 
388  * For the asynchronous, non-blocking, version of this function, see 
389  * g_buffered_input_stream_fill_async().
390  *
391  * Returns: the number of bytes read into @stream's buffer, up to @count, 
392  *     or -1 on error.
393  **/
394 gssize
395 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
396                               gssize                 count,
397                               GCancellable          *cancellable,
398                               GError               **error)
399 {
400   GBufferedInputStreamClass *class;
401   GInputStream *input_stream;
402   gssize res;
403
404   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
405   
406   input_stream = G_INPUT_STREAM (stream);
407   
408   if (g_input_stream_is_closed (input_stream))
409     {
410       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
411                    _("Stream is already closed"));
412       return -1;
413     }
414   
415   if (g_input_stream_has_pending (input_stream))
416     {
417       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
418                    _("Stream has outstanding operation"));
419       return -1;
420     }
421       
422   g_input_stream_set_pending (input_stream, TRUE);
423
424   if (cancellable)
425     g_push_current_cancellable (cancellable);
426   
427   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
428   res = class->fill (stream, count, cancellable, error);
429
430   if (cancellable)
431     g_pop_current_cancellable (cancellable);
432   
433   g_input_stream_set_pending (input_stream, FALSE);
434   
435   return res;
436 }
437
438 static void
439 async_fill_callback_wrapper (GObject      *source_object,
440                              GAsyncResult *res,
441                              gpointer      user_data)
442 {
443   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
444
445   g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE);
446   (*stream->priv->outstanding_callback) (source_object, res, user_data);
447   g_object_unref (stream);
448 }
449
450 /**
451  * g_buffered_input_stream_fill_async:
452  * @stream: #GBufferedInputStream.
453  * @count: a #gssize.
454  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
455  *     of the request.
456  * @cancellable: optional #GCancellable object
457  * @callback: a #GAsyncReadyCallback.
458  * @user_data: a #gpointer.
459  *
460  * Reads data into @stream's buffer asynchronously, up to @count size.
461  * @io_priority can be used to prioritize reads. For the synchronous
462  * version of this function, see g_buffered_input_stream_fill().
463  **/
464 void
465 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
466                                     gssize                count,
467                                     int                   io_priority,
468                                     GCancellable         *cancellable,
469                                     GAsyncReadyCallback   callback,
470                                     gpointer              user_data)
471 {
472   GBufferedInputStreamClass *class;
473   GSimpleAsyncResult *simple;
474
475   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
476
477   if (count == 0)
478     {
479       simple = g_simple_async_result_new (G_OBJECT (stream),
480                                           callback,
481                                           user_data,
482                                           g_buffered_input_stream_fill_async);
483       g_simple_async_result_complete_in_idle (simple);
484       g_object_unref (simple);
485       return;
486     }
487   
488   if (((gssize) count) < 0)
489     {
490       g_simple_async_report_error_in_idle (G_OBJECT (stream),
491                                            callback,
492                                            user_data,
493                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
494                                            _("Too large count value passed to g_input_stream_read_async"));
495       return;
496     }
497   
498   if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
499     {
500       g_simple_async_report_error_in_idle (G_OBJECT (stream),
501                                            callback,
502                                            user_data,
503                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
504                                            _("Stream is already closed"));
505       return;
506     }
507     
508   if (g_input_stream_has_pending (G_INPUT_STREAM (stream)))
509     {
510       g_simple_async_report_error_in_idle (G_OBJECT (stream),
511                                            callback,
512                                            user_data,
513                                            G_IO_ERROR, G_IO_ERROR_PENDING,
514                                            _("Stream has outstanding operation"));
515       return;
516     }
517
518   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
519   
520   g_input_stream_set_pending (G_INPUT_STREAM (stream), TRUE);
521   stream->priv->outstanding_callback = callback;
522   g_object_ref (stream);
523   class->fill_async (stream, count, io_priority, cancellable,
524                      async_fill_callback_wrapper, user_data);
525 }
526
527 /**
528  * g_buffered_input_stream_fill_finish:
529  * @stream: a #GBufferedInputStream.
530  * @result: a #GAsyncResult.
531  * @error: a #GError.
532  *
533  * Finishes an asynchronous read.
534  * 
535  * Returns: a #gssize of the read stream, or %-1 on an error. 
536  **/
537 gssize
538 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
539                                      GAsyncResult          *result,
540                                      GError               **error)
541 {
542   GSimpleAsyncResult *simple;
543   GBufferedInputStreamClass *class;
544
545   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
546   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
547
548   if (G_IS_SIMPLE_ASYNC_RESULT (result))
549     {
550       simple = G_SIMPLE_ASYNC_RESULT (result);
551       if (g_simple_async_result_propagate_error (simple, error))
552         return -1;
553
554       /* Special case read of 0 bytes */
555       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
556         return 0;
557     }
558
559   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
560   return class->fill_finish (stream, result, error);
561 }
562
563 /**
564  * g_buffered_input_stream_get_available:
565  * @stream: #GBufferedInputStream.
566  * 
567  * Gets the size of the available data within the stream.
568  * 
569  * Returns: size of the available stream. 
570  **/
571 gsize
572 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
573 {
574   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
575
576   return stream->priv->end - stream->priv->pos;
577 }
578
579 /**
580  * g_buffered_input_stream_peek:
581  * @stream: a #GBufferedInputStream.
582  * @buffer: a pointer to an allocated chunk of memory.
583  * @offset: a #gsize.
584  * @count: a #gsize.
585  * 
586  * Peeks in the buffer, copying data of size @count into @buffer, 
587  * offset @offset bytes.
588  * 
589  * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
590  **/
591 gsize
592 g_buffered_input_stream_peek (GBufferedInputStream *stream,
593                               void                 *buffer,
594                               gsize                 offset,
595                               gsize                 count)
596 {
597   gsize available;
598   gsize end;
599   
600   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
601   g_return_val_if_fail (buffer != NULL, -1);
602
603   available = g_buffered_input_stream_get_available (stream);
604
605   if (offset > available)
606     return 0;
607   
608   end = MIN (offset + count, available);
609   count = end - offset;
610   
611   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
612   return count;
613 }
614
615 /**
616  * g_buffered_input_stream_peek_buffer:
617  * @stream: a #GBufferedInputStream.
618  * @count: a #gsize to get the number of bytes available in the buffer.
619  *
620  * Returns the buffer with the currently available bytes. The returned
621  * buffer must not be modified and will become invalid when reading from
622  * the stream or filling the buffer.
623  *
624  * Returns: read-only buffer
625  **/
626 const void*
627 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
628                                      gsize                *count)
629 {
630   GBufferedInputStreamPrivate *priv;
631
632   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
633
634   priv = stream->priv;
635
636   if (count) 
637     *count = priv->end - priv->pos;
638
639   return priv->buffer + priv->pos;
640 }
641
642 static void
643 compact_buffer (GBufferedInputStream *stream)
644 {
645   GBufferedInputStreamPrivate *priv;
646   gsize current_size;
647
648   priv = stream->priv;
649
650   current_size = priv->end - priv->pos;
651   
652   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
653   
654   priv->pos = 0;
655   priv->end = current_size;
656 }
657
658 static gssize
659 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
660                                    gssize                 count,
661                                    GCancellable          *cancellable,
662                                    GError               **error)
663 {
664   GBufferedInputStreamPrivate *priv;
665   GInputStream *base_stream;
666   gssize nread;
667   gsize in_buffer;
668
669   priv = stream->priv;
670
671   if (count == -1)
672     count = priv->len;
673   
674   in_buffer = priv->end - priv->pos;
675
676   /* Never fill more than can fit in the buffer */
677   count = MIN (count, priv->len - in_buffer);
678
679   /* If requested length does not fit at end, compact */
680   if (priv->len - priv->end < count)
681     compact_buffer (stream);
682
683   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
684   nread = g_input_stream_read (base_stream,
685                                priv->buffer + priv->end,
686                                count,
687                                cancellable,
688                                error);
689
690   if (nread > 0)
691     priv->end += nread;
692   
693   return nread;
694 }
695
696 static gssize
697 g_buffered_input_stream_skip (GInputStream  *stream,
698                               gsize          count,
699                               GCancellable  *cancellable,
700                               GError       **error)
701 {
702   GBufferedInputStream        *bstream;
703   GBufferedInputStreamPrivate *priv;
704   GInputStream *base_stream;
705   gsize available, bytes_skipped;
706   gssize nread;
707
708   bstream = G_BUFFERED_INPUT_STREAM (stream);
709   priv = bstream->priv;
710
711   available = priv->end - priv->pos;
712
713   if (count <= available)
714     {
715       priv->pos += count;
716       return count;
717     }
718
719   /* Full request not available, skip all currently available and 
720    * request refill for more 
721    */
722   
723   priv->pos = 0;
724   priv->end = 0;
725   bytes_skipped = available;
726   count -= available;
727
728   if (bytes_skipped > 0)
729     error = NULL; /* Ignore further errors if we already read some data */
730   
731   if (count > priv->len)
732     {
733       /* Large request, shortcut buffer */
734       
735       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
736
737       nread = g_input_stream_skip (base_stream,
738                                    count,
739                                    cancellable,
740                                    error);
741       
742       if (nread < 0 && bytes_skipped == 0)
743         return -1;
744       
745       if (nread > 0)
746         bytes_skipped += nread;
747       
748       return bytes_skipped;
749     }
750   
751   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
752   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
753   g_input_stream_set_pending (stream, TRUE); /* enable again */
754   
755   if (nread < 0)
756     {
757       if (bytes_skipped == 0)
758         return -1;
759       else
760         return bytes_skipped;
761     }
762   
763   available = priv->end - priv->pos;
764   count = MIN (count, available);
765   
766   bytes_skipped += count;
767   priv->pos += count;
768   
769   return bytes_skipped;
770 }
771
772 static gssize
773 g_buffered_input_stream_read (GInputStream *stream,
774                               void         *buffer,
775                               gsize         count,
776                               GCancellable *cancellable,
777                               GError      **error)
778 {
779   GBufferedInputStream        *bstream;
780   GBufferedInputStreamPrivate *priv;
781   GInputStream *base_stream;
782   gsize available, bytes_read;
783   gssize nread;
784
785   bstream = G_BUFFERED_INPUT_STREAM (stream);
786   priv = bstream->priv;
787
788   available = priv->end - priv->pos;
789
790   if (count <= available)
791     {
792       memcpy (buffer, priv->buffer + priv->pos, count);
793       priv->pos += count;
794       return count;
795     }
796   
797   /* Full request not available, read all currently availbile and request refill for more */
798   
799   memcpy (buffer, priv->buffer + priv->pos, available);
800   priv->pos = 0;
801   priv->end = 0;
802   bytes_read = available;
803   count -= available;
804
805   if (bytes_read > 0)
806     error = NULL; /* Ignore further errors if we already read some data */
807   
808   if (count > priv->len)
809     {
810       /* Large request, shortcut buffer */
811       
812       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
813
814       nread = g_input_stream_read (base_stream,
815                                    (char *)buffer + bytes_read,
816                                    count,
817                                    cancellable,
818                                    error);
819       
820       if (nread < 0 && bytes_read == 0)
821         return -1;
822       
823       if (nread > 0)
824         bytes_read += nread;
825       
826       return bytes_read;
827     }
828   
829   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
830   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
831   g_input_stream_set_pending (stream, TRUE); /* enable again */
832   if (nread < 0)
833     {
834       if (bytes_read == 0)
835         return -1;
836       else
837         return bytes_read;
838     }
839   
840   available = priv->end - priv->pos;
841   count = MIN (count, available);
842   
843   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
844   bytes_read += count;
845   priv->pos += count;
846   
847   return bytes_read;
848 }
849
850 /**
851  * g_buffered_input_stream_read_byte:
852  * @stream: #GBufferedInputStream.
853  * @cancellable: optional #GCancellable object, %NULL to ignore.
854  * @error: location to store the error occuring, or %NULL to ignore.
855  *
856  * Tries to read a single byte from the stream or the buffer. Will block
857  * during this read.
858  *
859  * On success, the byte read from the stream is returned. On end of stream
860  * -1 is returned but it's not an exceptional error and @error is not set.
861  * 
862  * If @cancellable is not %NULL, then the operation can be cancelled by
863  * triggering the cancellable object from another thread. If the operation
864  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
865  * operation was partially finished when the operation was cancelled the
866  * partial result will be returned, without an error.
867  *
868  * On error -1 is returned and @error is set accordingly.
869  * 
870  * Returns: the byte read from the @stream, or -1 on end of stream or error.
871  **/
872 int
873 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
874                                    GCancellable          *cancellable,
875                                    GError               **error)
876 {
877   GBufferedInputStreamPrivate *priv;
878   GInputStream *input_stream;
879   gsize available;
880   gssize nread;
881
882   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
883
884   priv = stream->priv;
885   input_stream = G_INPUT_STREAM (stream);
886
887   if (g_input_stream_is_closed (input_stream))
888     {
889       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
890        _("Stream is already closed"));
891       return -1;
892     }
893
894   if (g_input_stream_has_pending (input_stream))
895     {
896       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
897        _("Stream has outstanding operation"));
898       return -1;
899     }
900
901   available = priv->end - priv->pos;
902
903   if (available < 1)
904     return priv->buffer[priv->pos++];
905
906   /* Byte not available, request refill for more */
907
908   if (cancellable)
909     g_push_current_cancellable (cancellable);
910
911   priv->pos = 0;
912   priv->end = 0;
913
914   nread = g_buffered_input_stream_fill (stream, priv->len, cancellable, error);
915
916   if (cancellable)
917     g_pop_current_cancellable (cancellable);
918
919   if (nread <= 0)
920     return -1; /* error or end of stream */
921
922   return priv->buffer[priv->pos++];
923 }
924
925 /* ************************** */
926 /* Async stuff implementation */
927 /* ************************** */
928
929 static void
930 fill_async_callback (GObject      *source_object,
931                      GAsyncResult *result,
932                      gpointer      user_data)
933 {
934   GError *error;
935   gssize res;
936   GSimpleAsyncResult *simple;
937
938   simple = user_data;
939   
940   error = NULL;
941   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
942                                     result, &error);
943
944   g_simple_async_result_set_op_res_gssize (simple, res);
945   if (res == -1)
946     {
947       g_simple_async_result_set_from_error (simple, error);
948       g_error_free (error);
949     }
950   
951   /* Complete immediately, not in idle, since we're already in a mainloop callout */
952   g_simple_async_result_complete (simple);
953   g_object_unref (simple);
954 }
955
956 static void
957 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
958                                          gssize                count,
959                                          int                   io_priority,
960                                          GCancellable         *cancellable,
961                                          GAsyncReadyCallback   callback,
962                                          gpointer              user_data)
963 {
964   GBufferedInputStreamPrivate *priv;
965   GInputStream *base_stream;
966   GSimpleAsyncResult *simple;
967   gsize in_buffer;
968
969   priv = stream->priv;
970
971   if (count == -1)
972     count = priv->len;
973   
974   in_buffer = priv->end - priv->pos;
975
976   /* Never fill more than can fit in the buffer */
977   count = MIN (count, priv->len - in_buffer);
978
979   /* If requested length does not fit at end, compact */
980   if (priv->len - priv->end < count)
981     compact_buffer (stream);
982
983   simple = g_simple_async_result_new (G_OBJECT (stream),
984                                       callback, user_data,
985                                       g_buffered_input_stream_real_fill_async);
986   
987   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
988   g_input_stream_read_async (base_stream,
989                              priv->buffer + priv->end,
990                              count,
991                              io_priority,
992                              cancellable,
993                              fill_async_callback,
994                              simple);
995 }
996
997 static gssize
998 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
999                                           GAsyncResult         *result,
1000                                           GError              **error)
1001 {
1002   GSimpleAsyncResult *simple;
1003   gssize nread;
1004
1005   simple = G_SIMPLE_ASYNC_RESULT (result);
1006   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1007   
1008   nread = g_simple_async_result_get_op_res_gssize (simple);
1009   return nread;
1010 }
1011
1012 typedef struct {
1013   gssize bytes_read;
1014   gssize count;
1015   void *buffer;
1016 } ReadAsyncData;
1017
1018 static void 
1019 free_read_async_data (gpointer _data)
1020 {
1021   ReadAsyncData *data = _data;
1022   g_slice_free (ReadAsyncData, data);
1023 }
1024
1025 static void
1026 large_read_callback (GObject *source_object,
1027                      GAsyncResult *result,
1028                      gpointer user_data)
1029 {
1030   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1031   ReadAsyncData *data;
1032   GError *error;
1033   gssize nread;
1034
1035   data = g_simple_async_result_get_op_res_gpointer (simple);
1036   
1037   error = NULL;
1038   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1039                                       result, &error);
1040
1041   /* Only report the error if we've not already read some data */
1042   if (nread < 0 && data->bytes_read == 0)
1043     g_simple_async_result_set_from_error (simple, error);
1044   
1045   if (nread > 0)
1046     data->bytes_read += nread;
1047   
1048   if (error)
1049     g_error_free (error);
1050   
1051   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1052   g_simple_async_result_complete (simple);
1053   g_object_unref (simple);
1054 }
1055
1056 static void
1057 read_fill_buffer_callback (GObject *source_object,
1058                            GAsyncResult *result,
1059                            gpointer user_data)
1060 {
1061   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1062   GBufferedInputStream *bstream;
1063   GBufferedInputStreamPrivate *priv;
1064   ReadAsyncData *data;
1065   GError *error;
1066   gssize nread;
1067   gsize available;
1068
1069   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1070   priv = bstream->priv;
1071   
1072   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1073   
1074   data = g_simple_async_result_get_op_res_gpointer (simple);
1075   
1076   error = NULL;
1077   nread = g_buffered_input_stream_fill_finish (bstream,
1078                                                result, &error);
1079   
1080   if (nread < 0 && data->bytes_read == 0)
1081     g_simple_async_result_set_from_error (simple, error);
1082
1083
1084   if (nread > 0)
1085     {
1086       available = priv->end - priv->pos;
1087       data->count = MIN (data->count, available);
1088       
1089       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1090       data->bytes_read += data->count;
1091       priv->pos += data->count;
1092     }
1093
1094   if (error)
1095     g_error_free (error);
1096   
1097   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1098   g_simple_async_result_complete (simple);
1099   g_object_unref (simple);
1100 }
1101
1102 static void
1103 g_buffered_input_stream_read_async (GInputStream              *stream,
1104                                     void                      *buffer,
1105                                     gsize                      count,
1106                                     int                        io_priority,
1107                                     GCancellable              *cancellable,
1108                                     GAsyncReadyCallback        callback,
1109                                     gpointer                   user_data)
1110 {
1111   GBufferedInputStream *bstream;
1112   GBufferedInputStreamPrivate *priv;
1113   GInputStream *base_stream;
1114   gsize available;
1115   GSimpleAsyncResult *simple;
1116   ReadAsyncData *data;
1117
1118   bstream = G_BUFFERED_INPUT_STREAM (stream);
1119   priv = bstream->priv;
1120
1121   data = g_slice_new (ReadAsyncData);
1122   data->buffer = buffer;
1123   data->bytes_read = 0;
1124   simple = g_simple_async_result_new (G_OBJECT (stream),
1125                                       callback, user_data,
1126                                       g_buffered_input_stream_read_async);
1127   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1128   
1129   available = priv->end - priv->pos;
1130   
1131   if (count <= available)
1132     {
1133       memcpy (buffer, priv->buffer + priv->pos, count);
1134       priv->pos += count;
1135       data->bytes_read = count;
1136       
1137       g_simple_async_result_complete_in_idle (simple);
1138       g_object_unref (simple);
1139       return;
1140     }
1141
1142
1143   /* Full request not available, read all currently availbile and request refill for more */
1144   
1145   memcpy (buffer, priv->buffer + priv->pos, available);
1146   priv->pos = 0;
1147   priv->end = 0;
1148   
1149   count -= available;
1150   
1151   data->bytes_read = available;
1152   data->count = count;
1153
1154   if (count > priv->len)
1155     {
1156       /* Large request, shortcut buffer */
1157       
1158       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1159       
1160       g_input_stream_read_async (base_stream,
1161                                  (char *)buffer + data->bytes_read,
1162                                  count,
1163                                  io_priority, cancellable,
1164                                  large_read_callback,
1165                                  simple);
1166     }
1167   else
1168     {
1169       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1170       g_buffered_input_stream_fill_async (bstream, priv->len,
1171                                           io_priority, cancellable,
1172                                           read_fill_buffer_callback, simple);
1173     }
1174 }
1175
1176 static gssize
1177 g_buffered_input_stream_read_finish (GInputStream   *stream,
1178                                      GAsyncResult   *result,
1179                                      GError        **error)
1180 {
1181   GSimpleAsyncResult *simple;
1182   ReadAsyncData *data;
1183   
1184   simple = G_SIMPLE_ASYNC_RESULT (result);
1185   
1186   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1187
1188   data = g_simple_async_result_get_op_res_gpointer (simple);
1189   
1190   return data->bytes_read;
1191 }
1192
1193 typedef struct {
1194   gssize bytes_skipped;
1195   gssize count;
1196 } SkipAsyncData;
1197
1198 static void 
1199 free_skip_async_data (gpointer _data)
1200 {
1201   SkipAsyncData *data = _data;
1202   g_slice_free (SkipAsyncData, data);
1203 }
1204
1205 static void
1206 large_skip_callback (GObject *source_object,
1207                      GAsyncResult *result,
1208                      gpointer user_data)
1209 {
1210   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1211   SkipAsyncData *data;
1212   GError *error;
1213   gssize nread;
1214
1215   data = g_simple_async_result_get_op_res_gpointer (simple);
1216   
1217   error = NULL;
1218   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1219                                       result, &error);
1220
1221   /* Only report the error if we've not already read some data */
1222   if (nread < 0 && data->bytes_skipped == 0)
1223     g_simple_async_result_set_from_error (simple, error);
1224   
1225   if (nread > 0)
1226     data->bytes_skipped += nread;
1227   
1228   if (error)
1229     g_error_free (error);
1230   
1231   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1232   g_simple_async_result_complete (simple);
1233   g_object_unref (simple);
1234 }
1235
1236 static void
1237 skip_fill_buffer_callback (GObject *source_object,
1238                            GAsyncResult *result,
1239                            gpointer user_data)
1240 {
1241   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1242   GBufferedInputStream *bstream;
1243   GBufferedInputStreamPrivate *priv;
1244   SkipAsyncData *data;
1245   GError *error;
1246   gssize nread;
1247   gsize available;
1248
1249   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1250   priv = bstream->priv;
1251   
1252   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1253   
1254   data = g_simple_async_result_get_op_res_gpointer (simple);
1255   
1256   error = NULL;
1257   nread = g_buffered_input_stream_fill_finish (bstream,
1258                                                result, &error);
1259   
1260   if (nread < 0 && data->bytes_skipped == 0)
1261     g_simple_async_result_set_from_error (simple, error);
1262
1263
1264   if (nread > 0)
1265     {
1266       available = priv->end - priv->pos;
1267       data->count = MIN (data->count, available);
1268       
1269       data->bytes_skipped += data->count;
1270       priv->pos += data->count;
1271     }
1272
1273   if (error)
1274     g_error_free (error);
1275   
1276   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1277   g_simple_async_result_complete (simple);
1278   g_object_unref (simple);
1279 }
1280
1281 static void
1282 g_buffered_input_stream_skip_async (GInputStream              *stream,
1283                                     gsize                      count,
1284                                     int                        io_priority,
1285                                     GCancellable              *cancellable,
1286                                     GAsyncReadyCallback        callback,
1287                                     gpointer                   user_data)
1288 {
1289   GBufferedInputStream *bstream;
1290   GBufferedInputStreamPrivate *priv;
1291   GInputStream *base_stream;
1292   gsize available;
1293   GSimpleAsyncResult *simple;
1294   SkipAsyncData *data;
1295
1296   bstream = G_BUFFERED_INPUT_STREAM (stream);
1297   priv = bstream->priv;
1298
1299   data = g_slice_new (SkipAsyncData);
1300   data->bytes_skipped = 0;
1301   simple = g_simple_async_result_new (G_OBJECT (stream),
1302                                       callback, user_data,
1303                                       g_buffered_input_stream_skip_async);
1304   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1305   
1306   available = priv->end - priv->pos;
1307   
1308   if (count <= available)
1309     {
1310       priv->pos += count;
1311       data->bytes_skipped = count;
1312       
1313       g_simple_async_result_complete_in_idle (simple);
1314       g_object_unref (simple);
1315       return;
1316     }
1317
1318
1319   /* Full request not available, skip all currently availbile and request refill for more */
1320   
1321   priv->pos = 0;
1322   priv->end = 0;
1323   
1324   count -= available;
1325   
1326   data->bytes_skipped = available;
1327   data->count = count;
1328
1329   if (count > priv->len)
1330     {
1331       /* Large request, shortcut buffer */
1332       
1333       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1334       
1335       g_input_stream_skip_async (base_stream,
1336                                  count,
1337                                  io_priority, cancellable,
1338                                  large_skip_callback,
1339                                  simple);
1340     }
1341   else
1342     {
1343       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1344       g_buffered_input_stream_fill_async (bstream, priv->len,
1345                                           io_priority, cancellable,
1346                                           skip_fill_buffer_callback, simple);
1347     }
1348 }
1349
1350 static gssize
1351 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1352                                      GAsyncResult   *result,
1353                                      GError        **error)
1354 {
1355   GSimpleAsyncResult *simple;
1356   SkipAsyncData *data;
1357   
1358   simple = G_SIMPLE_ASYNC_RESULT (result);
1359   
1360   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1361
1362   data = g_simple_async_result_get_op_res_gpointer (simple);
1363   
1364   return data->bytes_skipped;
1365 }
1366
1367
1368 #define __G_BUFFERED_INPUT_STREAM_C__
1369 #include "gioaliasdef.c"