Coding style fixups
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org> 
22  */
23
24 #include <config.h>
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gsimpleasyncresult.h"
28 #include <string.h>
29 #include "glibintl.h"
30
31 #include "gioalias.h"
32
33 /**
34  * SECTION:gbufferedinputstream
35  * @short_description: Buffered Input Stream
36  * @see_also: #GFilterInputStream, #GInputStream
37  * 
38  * Buffered input stream implements #GFilterInputStream and provides 
39  * for buffered reads. 
40  * 
41  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
42  * 
43  * To create a buffered input stream, use g_buffered_input_stream_new(), or 
44  * g_buffered_input_stream_new_sized() to specify the buffer's size at construction.
45  * 
46  * To get the size of a buffer within a buffered input stream, use 
47  * g_buffered_input_stream_get_buffer_size(). To change the size of a 
48  * buffered input stream's buffer, use g_buffered_input_stream_set_buffer_size(). 
49  * Note: the buffer's size cannot be reduced below the size of the data within the
50  * buffer.
51  *
52  **/
53
54
55
56 #define DEFAULT_BUFFER_SIZE 4096
57
58 struct _GBufferedInputStreamPrivate {
59   guint8 *buffer;
60   gsize   len;
61   gsize   pos;
62   gsize   end;
63   GAsyncReadyCallback outstanding_callback;
64 };
65
66 enum {
67   PROP_0,
68   PROP_BUFSIZE
69 };
70
71 static void g_buffered_input_stream_set_property  (GObject      *object,
72                                                    guint         prop_id,
73                                                    const GValue *value,
74                                                    GParamSpec   *pspec);
75
76 static void g_buffered_input_stream_get_property  (GObject      *object,
77                                                    guint         prop_id,
78                                                    GValue       *value,
79                                                    GParamSpec   *pspec);
80 static void g_buffered_input_stream_finalize      (GObject *object);
81
82
83 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
84                                                         gsize                  count,
85                                                         GCancellable          *cancellable,
86                                                         GError               **error);
87 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
88                                                         gsize                  count,
89                                                         int                    io_priority,
90                                                         GCancellable          *cancellable,
91                                                         GAsyncReadyCallback    callback,
92                                                         gpointer               user_data);
93 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
94                                                         GAsyncResult          *result,
95                                                         GError               **error);
96 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
97                                                         void                  *buffer,
98                                                         gsize                  count,
99                                                         GCancellable          *cancellable,
100                                                         GError               **error);
101 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
102                                                         void                  *buffer,
103                                                         gsize                  count,
104                                                         int                    io_priority,
105                                                         GCancellable          *cancellable,
106                                                         GAsyncReadyCallback    callback,
107                                                         gpointer               user_data);
108 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
109                                                         GAsyncResult          *result,
110                                                         GError               **error);
111 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
112                                                         gssize                 count,
113                                                         GCancellable          *cancellable,
114                                                         GError               **error);
115 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
116                                                         gssize                 count,
117                                                         int                    io_priority,
118                                                         GCancellable          *cancellable,
119                                                         GAsyncReadyCallback    callback,
120                                                         gpointer               user_data);
121 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
122                                                         GAsyncResult          *result,
123                                                         GError               **error);
124
125 static void compact_buffer (GBufferedInputStream *stream);
126
127 G_DEFINE_TYPE (GBufferedInputStream,
128                g_buffered_input_stream,
129                G_TYPE_FILTER_INPUT_STREAM)
130
131
132 static void
133 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
134 {
135   GObjectClass *object_class;
136   GInputStreamClass *istream_class;
137   GBufferedInputStreamClass *bstream_class;
138
139   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
140
141   object_class = G_OBJECT_CLASS (klass);
142   object_class->get_property = g_buffered_input_stream_get_property;
143   object_class->set_property = g_buffered_input_stream_set_property;
144   object_class->finalize     = g_buffered_input_stream_finalize;
145
146   istream_class = G_INPUT_STREAM_CLASS (klass);
147   istream_class->skip = g_buffered_input_stream_skip;
148   istream_class->skip_async  = g_buffered_input_stream_skip_async;
149   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
150   istream_class->read = g_buffered_input_stream_read;
151   istream_class->read_async  = g_buffered_input_stream_read_async;
152   istream_class->read_finish = g_buffered_input_stream_read_finish;
153
154   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
155   bstream_class->fill = g_buffered_input_stream_real_fill;
156   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
157   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
158   
159   g_object_class_install_property (object_class,
160                                    PROP_BUFSIZE,
161                                    g_param_spec_uint ("buffer-size",
162                                                       P_("Buffer Size"),
163                                                       P_("The size of the backend buffer"),
164                                                       1,
165                                                       G_MAXUINT,
166                                                       DEFAULT_BUFFER_SIZE,
167                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
168                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
169
170
171 }
172
173 /**
174  * g_buffered_input_stream_get_buffer_size:
175  * @stream: #GBufferedInputStream.
176  * 
177  * Gets the size of the input buffer.
178  * 
179  * Returns: the current buffer size, or %-1 on error.
180  **/
181 gsize
182 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
183 {
184   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
185
186   return stream->priv->len;
187 }
188
189 /**
190  * g_buffered_input_stream_set_buffer_size:
191  * @stream: #GBufferedInputStream.
192  * @size: a #gsize.
193  *
194  * Sets the size of the internal buffer of @stream to @size, or to the 
195  * size of the contents of the buffer. The buffer can never be resized 
196  * smaller than its current contents.
197  **/
198 void
199 g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
200                                          gsize                  size)
201 {
202   GBufferedInputStreamPrivate *priv;
203   gsize in_buffer;
204   guint8 *buffer;
205   
206   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
207
208   priv = stream->priv;
209
210   if (priv->buffer)
211     {
212       in_buffer = priv->end - priv->pos;
213
214       /* Never resize smaller than current buffer contents */
215       size = MAX (size, in_buffer);
216
217       buffer = g_malloc (size);
218       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
219       priv->len = size;
220       priv->pos = 0;
221       priv->end = in_buffer;
222       g_free (priv->buffer);
223       priv->buffer = buffer;
224     }
225   else
226     {
227       priv->len = size;
228       priv->pos = 0;
229       priv->end = 0;
230       priv->buffer = g_malloc (size);
231     }
232 }
233
234 static void
235 g_buffered_input_stream_set_property (GObject      *object,
236                                       guint         prop_id,
237                                       const GValue *value,
238                                       GParamSpec   *pspec)
239 {
240   GBufferedInputStreamPrivate *priv;
241   GBufferedInputStream        *bstream;
242
243   bstream = G_BUFFERED_INPUT_STREAM (object);
244   priv = bstream->priv;
245
246   switch (prop_id) 
247     {
248     case PROP_BUFSIZE:
249       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
250       break;
251
252     default:
253       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
254       break;
255     }
256
257 }
258
259 static void
260 g_buffered_input_stream_get_property (GObject    *object,
261                                       guint       prop_id,
262                                       GValue     *value,
263                                       GParamSpec *pspec)
264 {
265   GBufferedInputStreamPrivate *priv;
266   GBufferedInputStream        *bstream;
267
268   bstream = G_BUFFERED_INPUT_STREAM (object);
269   priv = bstream->priv;
270
271   switch (prop_id)
272     { 
273     case PROP_BUFSIZE:
274       g_value_set_uint (value, priv->len);
275       break;
276
277     default:
278       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
279       break;
280     }
281 }
282
283 static void
284 g_buffered_input_stream_finalize (GObject *object)
285 {
286   GBufferedInputStreamPrivate *priv;
287   GBufferedInputStream        *stream;
288
289   stream = G_BUFFERED_INPUT_STREAM (object);
290   priv = stream->priv;
291
292   g_free (priv->buffer);
293
294   if (G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize)
295     (*G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) (object);
296 }
297
298 static void
299 g_buffered_input_stream_init (GBufferedInputStream *stream)
300 {
301   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
302                                               G_TYPE_BUFFERED_INPUT_STREAM,
303                                               GBufferedInputStreamPrivate);
304 }
305
306
307 /**
308  * g_buffered_input_stream_new:
309  * @base_stream: a #GInputStream.
310  * 
311  * Creates a new #GInputStream from the given @base_stream, with 
312  * a buffer set to the default size (4 kilobytes).
313  *
314  * Returns: a #GInputStream for the given @base_stream.
315  **/
316 GInputStream *
317 g_buffered_input_stream_new (GInputStream *base_stream)
318 {
319   GInputStream *stream;
320
321   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
322
323   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
324                          "base-stream", base_stream,
325                          NULL);
326
327   return stream;
328 }
329
330 /**
331  * g_buffered_input_stream_new_sized:
332  * @base_stream: a #GOutputStream.
333  * @size: a #gsize.
334  * 
335  * Creates a new #GBufferedInputStream from the given @base_stream, 
336  * with a buffer set to @size.
337  *
338  * Returns: a #GInputStream.
339  **/
340 GInputStream *
341 g_buffered_input_stream_new_sized (GInputStream *base_stream,
342                                    gsize         size)
343 {
344   GInputStream *stream;
345
346   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
347
348   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
349                          "base-stream", base_stream,
350                          "buffer-size", (guint)size,
351                          NULL);
352
353   return stream;
354 }
355
356 /**
357  * g_buffered_input_stream_fill:
358  * @stream: #GBufferedInputStream.
359  * @count: the number of bytes that will be read from the stream.
360  * @cancellable: optional #GCancellable object, %NULL to ignore.
361  * @error: location to store the error occuring, or %NULL to ignore.
362  *
363  * Tries to read @count bytes from the stream into the buffer. 
364  * Will block during this read.
365  * 
366  * If @count is zero, returns zero and does nothing. A value of @count
367  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
368  *
369  * On success, the number of bytes read into the buffer is returned.
370  * It is not an error if this is not the same as the requested size, as it
371  * can happen e.g. near the end of a file. Zero is returned on end of file
372  * (or if @count is zero),  but never otherwise.
373  *
374  * If @cancellable is not %NULL, then the operation can be cancelled by
375  * triggering the cancellable object from another thread. If the operation
376  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
377  * operation was partially finished when the operation was cancelled the
378  * partial result will be returned, without an error.
379  *
380  * On error -1 is returned and @error is set accordingly.
381  * 
382  * For the asynchronous, non-blocking, version of this function, see 
383  * g_buffered_input_stream_fill_async().
384  *
385  * Returns: the number of bytes read into @stream's buffer, up to @count, 
386  *     or -1 on error.
387  **/
388 gssize
389 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
390                               gssize                 count,
391                               GCancellable          *cancellable,
392                               GError               **error)
393 {
394   GBufferedInputStreamClass *class;
395   GInputStream *input_stream;
396   gssize res;
397
398   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
399   
400   input_stream = G_INPUT_STREAM (stream);
401   
402   if (g_input_stream_is_closed (input_stream))
403     {
404       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
405                    _("Stream is already closed"));
406       return -1;
407     }
408   
409   if (g_input_stream_has_pending (input_stream))
410     {
411       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
412                    _("Stream has outstanding operation"));
413       return -1;
414     }
415       
416   g_input_stream_set_pending (input_stream, TRUE);
417
418   if (cancellable)
419     g_push_current_cancellable (cancellable);
420   
421   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
422   res = class->fill (stream, count, cancellable, error);
423
424   if (cancellable)
425     g_pop_current_cancellable (cancellable);
426   
427   g_input_stream_set_pending (input_stream, FALSE);
428   
429   return res;
430 }
431
432 static void
433 async_fill_callback_wrapper (GObject      *source_object,
434                              GAsyncResult *res,
435                              gpointer      user_data)
436 {
437   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
438
439   g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE);
440   (*stream->priv->outstanding_callback) (source_object, res, user_data);
441   g_object_unref (stream);
442 }
443
444 /**
445  * g_buffered_input_stream_fill_async:
446  * @stream: #GBufferedInputStream.
447  * @count: a #gssize.
448  * @io_priority: the io priority of the request.
449  * @cancellable: optional #GCancellable object
450  * @callback: a #GAsyncReadyCallback.
451  * @user_data: a #gpointer.
452  *
453  * Reads data into @stream's buffer asynchronously, up to @count size.
454  * @io_priority can be used to prioritize reads. For the synchronous
455  * version of this function, see g_buffered_input_stream_fill().
456  **/
457 void
458 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
459                                     gssize                count,
460                                     int                   io_priority,
461                                     GCancellable         *cancellable,
462                                     GAsyncReadyCallback   callback,
463                                     gpointer              user_data)
464 {
465   GBufferedInputStreamClass *class;
466   GSimpleAsyncResult *simple;
467
468   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
469
470   if (count == 0)
471     {
472       simple = g_simple_async_result_new (G_OBJECT (stream),
473                                           callback,
474                                           user_data,
475                                           g_buffered_input_stream_fill_async);
476       g_simple_async_result_complete_in_idle (simple);
477       g_object_unref (simple);
478       return;
479     }
480   
481   if (((gssize) count) < 0)
482     {
483       g_simple_async_report_error_in_idle (G_OBJECT (stream),
484                                            callback,
485                                            user_data,
486                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
487                                            _("Too large count value passed to g_input_stream_read_async"));
488       return;
489     }
490   
491   if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
492     {
493       g_simple_async_report_error_in_idle (G_OBJECT (stream),
494                                            callback,
495                                            user_data,
496                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
497                                            _("Stream is already closed"));
498       return;
499     }
500     
501   if (g_input_stream_has_pending (G_INPUT_STREAM (stream)))
502     {
503       g_simple_async_report_error_in_idle (G_OBJECT (stream),
504                                            callback,
505                                            user_data,
506                                            G_IO_ERROR, G_IO_ERROR_PENDING,
507                                            _("Stream has outstanding operation"));
508       return;
509     }
510
511   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
512   
513   g_input_stream_set_pending (G_INPUT_STREAM (stream), TRUE);
514   stream->priv->outstanding_callback = callback;
515   g_object_ref (stream);
516   class->fill_async (stream, count, io_priority, cancellable,
517                      async_fill_callback_wrapper, user_data);
518 }
519
520 /**
521  * g_buffered_input_stream_fill_finish:
522  * @stream: a #GBufferedInputStream.
523  * @result: a #GAsyncResult.
524  * @error: a #GError.
525  *
526  * Finishes an asynchronous read.
527  * 
528  * Returns: a #gssize of the read stream, or %-1 on an error. 
529  **/
530 gssize
531 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
532                                      GAsyncResult          *result,
533                                      GError               **error)
534 {
535   GSimpleAsyncResult *simple;
536   GBufferedInputStreamClass *class;
537
538   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
539   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
540
541   if (G_IS_SIMPLE_ASYNC_RESULT (result))
542     {
543       simple = G_SIMPLE_ASYNC_RESULT (result);
544       if (g_simple_async_result_propagate_error (simple, error))
545         return -1;
546
547       /* Special case read of 0 bytes */
548       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
549         return 0;
550     }
551
552   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
553   return class->fill_finish (stream, result, error);
554 }
555
556 /**
557  * g_buffered_input_stream_get_available:
558  * @stream: #GBufferedInputStream.
559  * 
560  * Gets the size of the available data within the stream.
561  * 
562  * Returns: size of the available stream. 
563  **/
564 gsize
565 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
566 {
567   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
568
569   return stream->priv->end - stream->priv->pos;
570 }
571
572 /**
573  * g_buffered_input_stream_peek:
574  * @stream: a #GBufferedInputStream.
575  * @buffer: a pointer to an allocated chunk of memory.
576  * @offset: a #gsize.
577  * @count: a #gsize.
578  * 
579  * Peeks in the buffer, copying data of size @count into @buffer, 
580  * offset @offset bytes.
581  * 
582  * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
583  **/
584 gsize
585 g_buffered_input_stream_peek (GBufferedInputStream *stream,
586                               void                 *buffer,
587                               gsize                 offset,
588                               gsize                 count)
589 {
590   gsize available;
591   gsize end;
592   
593   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
594   g_return_val_if_fail (buffer != NULL, -1);
595
596   available = g_buffered_input_stream_get_available (stream);
597
598   if (offset > available)
599     return 0;
600   
601   end = MIN (offset + count, available);
602   count = end - offset;
603   
604   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
605   return count;
606 }
607
608 /**
609  * g_buffered_input_stream_peek_buffer:
610  * @stream: a #GBufferedInputStream.
611  * @count: a #gsize to get the number of bytes available in the buffer.
612  *
613  * Returns the buffer with the currently available bytes. The returned
614  * buffer must not be modified and will become invalid when reading from
615  * the stream or filling the buffer.
616  *
617  * Returns: read-only buffer
618  **/
619 const void*
620 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
621                                      gsize                *count)
622 {
623   GBufferedInputStreamPrivate *priv;
624
625   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
626
627   priv = stream->priv;
628
629   if (count) 
630     *count = priv->end - priv->pos;
631
632   return priv->buffer + priv->pos;
633 }
634
635 static void
636 compact_buffer (GBufferedInputStream *stream)
637 {
638   GBufferedInputStreamPrivate *priv;
639   gsize current_size;
640
641   priv = stream->priv;
642
643   current_size = priv->end - priv->pos;
644   
645   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
646   
647   priv->pos = 0;
648   priv->end = current_size;
649 }
650
651 static gssize
652 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
653                                    gssize                 count,
654                                    GCancellable          *cancellable,
655                                    GError               **error)
656 {
657   GBufferedInputStreamPrivate *priv;
658   GInputStream *base_stream;
659   gssize nread;
660   gsize in_buffer;
661
662   priv = stream->priv;
663
664   if (count == -1)
665     count = priv->len;
666   
667   in_buffer = priv->end - priv->pos;
668
669   /* Never fill more than can fit in the buffer */
670   count = MIN (count, priv->len - in_buffer);
671
672   /* If requested length does not fit at end, compact */
673   if (priv->len - priv->end < count)
674     compact_buffer (stream);
675
676   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
677   nread = g_input_stream_read (base_stream,
678                                priv->buffer + priv->end,
679                                count,
680                                cancellable,
681                                error);
682
683   if (nread > 0)
684     priv->end += nread;
685   
686   return nread;
687 }
688
689 static gssize
690 g_buffered_input_stream_skip (GInputStream  *stream,
691                               gsize          count,
692                               GCancellable  *cancellable,
693                               GError       **error)
694 {
695   GBufferedInputStream        *bstream;
696   GBufferedInputStreamPrivate *priv;
697   GInputStream *base_stream;
698   gsize available, bytes_skipped;
699   gssize nread;
700
701   bstream = G_BUFFERED_INPUT_STREAM (stream);
702   priv = bstream->priv;
703
704   available = priv->end - priv->pos;
705
706   if (count <= available)
707     {
708       priv->pos += count;
709       return count;
710     }
711
712   /* Full request not available, skip all currently available and 
713    * request refill for more 
714    */
715   
716   priv->pos = 0;
717   priv->end = 0;
718   bytes_skipped = available;
719   count -= available;
720
721   if (bytes_skipped > 0)
722     error = NULL; /* Ignore further errors if we already read some data */
723   
724   if (count > priv->len)
725     {
726       /* Large request, shortcut buffer */
727       
728       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
729
730       nread = g_input_stream_skip (base_stream,
731                                    count,
732                                    cancellable,
733                                    error);
734       
735       if (nread < 0 && bytes_skipped == 0)
736         return -1;
737       
738       if (nread > 0)
739         bytes_skipped += nread;
740       
741       return bytes_skipped;
742     }
743   
744   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
745   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
746   g_input_stream_set_pending (stream, TRUE); /* enable again */
747   
748   if (nread < 0)
749     {
750       if (bytes_skipped == 0)
751         return -1;
752       else
753         return bytes_skipped;
754     }
755   
756   available = priv->end - priv->pos;
757   count = MIN (count, available);
758   
759   bytes_skipped += count;
760   priv->pos += count;
761   
762   return bytes_skipped;
763 }
764
765 static gssize
766 g_buffered_input_stream_read (GInputStream *stream,
767                               void         *buffer,
768                               gsize         count,
769                               GCancellable *cancellable,
770                               GError      **error)
771 {
772   GBufferedInputStream        *bstream;
773   GBufferedInputStreamPrivate *priv;
774   GInputStream *base_stream;
775   gsize available, bytes_read;
776   gssize nread;
777
778   bstream = G_BUFFERED_INPUT_STREAM (stream);
779   priv = bstream->priv;
780
781   available = priv->end - priv->pos;
782
783   if (count <= available)
784     {
785       memcpy (buffer, priv->buffer + priv->pos, count);
786       priv->pos += count;
787       return count;
788     }
789   
790   /* Full request not available, read all currently availbile and request refill for more */
791   
792   memcpy (buffer, priv->buffer + priv->pos, available);
793   priv->pos = 0;
794   priv->end = 0;
795   bytes_read = available;
796   count -= available;
797
798   if (bytes_read > 0)
799     error = NULL; /* Ignore further errors if we already read some data */
800   
801   if (count > priv->len)
802     {
803       /* Large request, shortcut buffer */
804       
805       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
806
807       nread = g_input_stream_read (base_stream,
808                                    (char *)buffer + bytes_read,
809                                    count,
810                                    cancellable,
811                                    error);
812       
813       if (nread < 0 && bytes_read == 0)
814         return -1;
815       
816       if (nread > 0)
817         bytes_read += nread;
818       
819       return bytes_read;
820     }
821   
822   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
823   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
824   g_input_stream_set_pending (stream, TRUE); /* enable again */
825   if (nread < 0)
826     {
827       if (bytes_read == 0)
828         return -1;
829       else
830         return bytes_read;
831     }
832   
833   available = priv->end - priv->pos;
834   count = MIN (count, available);
835   
836   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
837   bytes_read += count;
838   priv->pos += count;
839   
840   return bytes_read;
841 }
842
843 /**
844  * g_buffered_input_stream_read_byte:
845  * @stream: #GBufferedInputStream.
846  * @cancellable: optional #GCancellable object, %NULL to ignore.
847  * @error: location to store the error occuring, or %NULL to ignore.
848  *
849  * Tries to read a single byte from the stream or the buffer. Will block
850  * during this read.
851  *
852  * On success, the byte read from the stream is returned. On end of stream
853  * -1 is returned but it's not an exceptional error and @error is not set.
854  * 
855  * If @cancellable is not %NULL, then the operation can be cancelled by
856  * triggering the cancellable object from another thread. If the operation
857  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
858  * operation was partially finished when the operation was cancelled the
859  * partial result will be returned, without an error.
860  *
861  * On error -1 is returned and @error is set accordingly.
862  * 
863  * Returns: the byte read from the @stream, or -1 on end of stream or error.
864  **/
865 int
866 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
867                                    GCancellable          *cancellable,
868                                    GError               **error)
869 {
870   GBufferedInputStreamPrivate *priv;
871   GInputStream *input_stream;
872   gsize available;
873   gssize nread;
874
875   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
876
877   priv = stream->priv;
878   input_stream = G_INPUT_STREAM (stream);
879
880   if (g_input_stream_is_closed (input_stream))
881     {
882       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
883        _("Stream is already closed"));
884       return -1;
885     }
886
887   if (g_input_stream_has_pending (input_stream))
888     {
889       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
890        _("Stream has outstanding operation"));
891       return -1;
892     }
893
894   available = priv->end - priv->pos;
895
896   if (available < 1)
897     return priv->buffer[priv->pos++];
898
899   /* Byte not available, request refill for more */
900
901   if (cancellable)
902     g_push_current_cancellable (cancellable);
903
904   priv->pos = 0;
905   priv->end = 0;
906
907   nread = g_buffered_input_stream_fill (stream, priv->len, cancellable, error);
908
909   if (cancellable)
910     g_pop_current_cancellable (cancellable);
911
912   if (nread <= 0)
913     return -1; /* error or end of stream */
914
915   return priv->buffer[priv->pos++];
916 }
917
918 /* ************************** */
919 /* Async stuff implementation */
920 /* ************************** */
921
922 static void
923 fill_async_callback (GObject      *source_object,
924                      GAsyncResult *result,
925                      gpointer      user_data)
926 {
927   GError *error;
928   gssize res;
929   GSimpleAsyncResult *simple;
930
931   simple = user_data;
932   
933   error = NULL;
934   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
935                                     result, &error);
936
937   g_simple_async_result_set_op_res_gssize (simple, res);
938   if (res == -1)
939     {
940       g_simple_async_result_set_from_error (simple, error);
941       g_error_free (error);
942     }
943   
944   /* Complete immediately, not in idle, since we're already in a mainloop callout */
945   g_simple_async_result_complete (simple);
946   g_object_unref (simple);
947 }
948
949 static void
950 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
951                                          gssize                count,
952                                          int                   io_priority,
953                                          GCancellable         *cancellable,
954                                          GAsyncReadyCallback   callback,
955                                          gpointer              user_data)
956 {
957   GBufferedInputStreamPrivate *priv;
958   GInputStream *base_stream;
959   GSimpleAsyncResult *simple;
960   gsize in_buffer;
961
962   priv = stream->priv;
963
964   if (count == -1)
965     count = priv->len;
966   
967   in_buffer = priv->end - priv->pos;
968
969   /* Never fill more than can fit in the buffer */
970   count = MIN (count, priv->len - in_buffer);
971
972   /* If requested length does not fit at end, compact */
973   if (priv->len - priv->end < count)
974     compact_buffer (stream);
975
976   simple = g_simple_async_result_new (G_OBJECT (stream),
977                                       callback, user_data,
978                                       g_buffered_input_stream_real_fill_async);
979   
980   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
981   g_input_stream_read_async (base_stream,
982                              priv->buffer + priv->end,
983                              count,
984                              io_priority,
985                              cancellable,
986                              fill_async_callback,
987                              simple);
988 }
989
990 static gssize
991 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
992                                           GAsyncResult         *result,
993                                           GError              **error)
994 {
995   GSimpleAsyncResult *simple;
996   gssize nread;
997
998   simple = G_SIMPLE_ASYNC_RESULT (result);
999   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1000   
1001   nread = g_simple_async_result_get_op_res_gssize (simple);
1002   return nread;
1003 }
1004
1005 typedef struct {
1006   gssize bytes_read;
1007   gssize count;
1008   void *buffer;
1009 } ReadAsyncData;
1010
1011 static void 
1012 free_read_async_data (gpointer _data)
1013 {
1014   ReadAsyncData *data = _data;
1015   g_slice_free (ReadAsyncData, data);
1016 }
1017
1018 static void
1019 large_read_callback (GObject *source_object,
1020                      GAsyncResult *result,
1021                      gpointer user_data)
1022 {
1023   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1024   ReadAsyncData *data;
1025   GError *error;
1026   gssize nread;
1027
1028   data = g_simple_async_result_get_op_res_gpointer (simple);
1029   
1030   error = NULL;
1031   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1032                                       result, &error);
1033
1034   /* Only report the error if we've not already read some data */
1035   if (nread < 0 && data->bytes_read == 0)
1036     g_simple_async_result_set_from_error (simple, error);
1037   
1038   if (nread > 0)
1039     data->bytes_read += nread;
1040   
1041   if (error)
1042     g_error_free (error);
1043   
1044   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1045   g_simple_async_result_complete (simple);
1046   g_object_unref (simple);
1047 }
1048
1049 static void
1050 read_fill_buffer_callback (GObject *source_object,
1051                            GAsyncResult *result,
1052                            gpointer user_data)
1053 {
1054   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1055   GBufferedInputStream *bstream;
1056   GBufferedInputStreamPrivate *priv;
1057   ReadAsyncData *data;
1058   GError *error;
1059   gssize nread;
1060   gsize available;
1061
1062   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1063   priv = bstream->priv;
1064   
1065   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1066   
1067   data = g_simple_async_result_get_op_res_gpointer (simple);
1068   
1069   error = NULL;
1070   nread = g_buffered_input_stream_fill_finish (bstream,
1071                                                result, &error);
1072   
1073   if (nread < 0 && data->bytes_read == 0)
1074     g_simple_async_result_set_from_error (simple, error);
1075
1076
1077   if (nread > 0)
1078     {
1079       available = priv->end - priv->pos;
1080       data->count = MIN (data->count, available);
1081       
1082       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1083       data->bytes_read += data->count;
1084       priv->pos += data->count;
1085     }
1086
1087   if (error)
1088     g_error_free (error);
1089   
1090   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1091   g_simple_async_result_complete (simple);
1092   g_object_unref (simple);
1093 }
1094
1095 static void
1096 g_buffered_input_stream_read_async (GInputStream              *stream,
1097                                     void                      *buffer,
1098                                     gsize                      count,
1099                                     int                        io_priority,
1100                                     GCancellable              *cancellable,
1101                                     GAsyncReadyCallback        callback,
1102                                     gpointer                   user_data)
1103 {
1104   GBufferedInputStream *bstream;
1105   GBufferedInputStreamPrivate *priv;
1106   GInputStream *base_stream;
1107   gsize available;
1108   GSimpleAsyncResult *simple;
1109   ReadAsyncData *data;
1110
1111   bstream = G_BUFFERED_INPUT_STREAM (stream);
1112   priv = bstream->priv;
1113
1114   data = g_slice_new (ReadAsyncData);
1115   data->buffer = buffer;
1116   data->bytes_read = 0;
1117   simple = g_simple_async_result_new (G_OBJECT (stream),
1118                                       callback, user_data,
1119                                       g_buffered_input_stream_read_async);
1120   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1121   
1122   available = priv->end - priv->pos;
1123   
1124   if (count <= available)
1125     {
1126       memcpy (buffer, priv->buffer + priv->pos, count);
1127       priv->pos += count;
1128       data->bytes_read = count;
1129       
1130       g_simple_async_result_complete_in_idle (simple);
1131       g_object_unref (simple);
1132       return;
1133     }
1134
1135
1136   /* Full request not available, read all currently availbile and request refill for more */
1137   
1138   memcpy (buffer, priv->buffer + priv->pos, available);
1139   priv->pos = 0;
1140   priv->end = 0;
1141   
1142   count -= available;
1143   
1144   data->bytes_read = available;
1145   data->count = count;
1146
1147   if (count > priv->len)
1148     {
1149       /* Large request, shortcut buffer */
1150       
1151       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1152       
1153       g_input_stream_read_async (base_stream,
1154                                  (char *)buffer + data->bytes_read,
1155                                  count,
1156                                  io_priority, cancellable,
1157                                  large_read_callback,
1158                                  simple);
1159     }
1160   else
1161     {
1162       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1163       g_buffered_input_stream_fill_async (bstream, priv->len,
1164                                           io_priority, cancellable,
1165                                           read_fill_buffer_callback, simple);
1166     }
1167 }
1168
1169 static gssize
1170 g_buffered_input_stream_read_finish (GInputStream   *stream,
1171                                      GAsyncResult   *result,
1172                                      GError        **error)
1173 {
1174   GSimpleAsyncResult *simple;
1175   ReadAsyncData *data;
1176   
1177   simple = G_SIMPLE_ASYNC_RESULT (result);
1178   
1179   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1180
1181   data = g_simple_async_result_get_op_res_gpointer (simple);
1182   
1183   return data->bytes_read;
1184 }
1185
1186 typedef struct {
1187   gssize bytes_skipped;
1188   gssize count;
1189 } SkipAsyncData;
1190
1191 static void 
1192 free_skip_async_data (gpointer _data)
1193 {
1194   SkipAsyncData *data = _data;
1195   g_slice_free (SkipAsyncData, data);
1196 }
1197
1198 static void
1199 large_skip_callback (GObject *source_object,
1200                      GAsyncResult *result,
1201                      gpointer user_data)
1202 {
1203   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1204   SkipAsyncData *data;
1205   GError *error;
1206   gssize nread;
1207
1208   data = g_simple_async_result_get_op_res_gpointer (simple);
1209   
1210   error = NULL;
1211   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1212                                       result, &error);
1213
1214   /* Only report the error if we've not already read some data */
1215   if (nread < 0 && data->bytes_skipped == 0)
1216     g_simple_async_result_set_from_error (simple, error);
1217   
1218   if (nread > 0)
1219     data->bytes_skipped += nread;
1220   
1221   if (error)
1222     g_error_free (error);
1223   
1224   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1225   g_simple_async_result_complete (simple);
1226   g_object_unref (simple);
1227 }
1228
1229 static void
1230 skip_fill_buffer_callback (GObject *source_object,
1231                            GAsyncResult *result,
1232                            gpointer user_data)
1233 {
1234   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1235   GBufferedInputStream *bstream;
1236   GBufferedInputStreamPrivate *priv;
1237   SkipAsyncData *data;
1238   GError *error;
1239   gssize nread;
1240   gsize available;
1241
1242   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1243   priv = bstream->priv;
1244   
1245   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1246   
1247   data = g_simple_async_result_get_op_res_gpointer (simple);
1248   
1249   error = NULL;
1250   nread = g_buffered_input_stream_fill_finish (bstream,
1251                                                result, &error);
1252   
1253   if (nread < 0 && data->bytes_skipped == 0)
1254     g_simple_async_result_set_from_error (simple, error);
1255
1256
1257   if (nread > 0)
1258     {
1259       available = priv->end - priv->pos;
1260       data->count = MIN (data->count, available);
1261       
1262       data->bytes_skipped += data->count;
1263       priv->pos += data->count;
1264     }
1265
1266   if (error)
1267     g_error_free (error);
1268   
1269   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1270   g_simple_async_result_complete (simple);
1271   g_object_unref (simple);
1272 }
1273
1274 static void
1275 g_buffered_input_stream_skip_async (GInputStream              *stream,
1276                                     gsize                      count,
1277                                     int                        io_priority,
1278                                     GCancellable              *cancellable,
1279                                     GAsyncReadyCallback        callback,
1280                                     gpointer                   user_data)
1281 {
1282   GBufferedInputStream *bstream;
1283   GBufferedInputStreamPrivate *priv;
1284   GInputStream *base_stream;
1285   gsize available;
1286   GSimpleAsyncResult *simple;
1287   SkipAsyncData *data;
1288
1289   bstream = G_BUFFERED_INPUT_STREAM (stream);
1290   priv = bstream->priv;
1291
1292   data = g_slice_new (SkipAsyncData);
1293   data->bytes_skipped = 0;
1294   simple = g_simple_async_result_new (G_OBJECT (stream),
1295                                       callback, user_data,
1296                                       g_buffered_input_stream_skip_async);
1297   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1298   
1299   available = priv->end - priv->pos;
1300   
1301   if (count <= available)
1302     {
1303       priv->pos += count;
1304       data->bytes_skipped = count;
1305       
1306       g_simple_async_result_complete_in_idle (simple);
1307       g_object_unref (simple);
1308       return;
1309     }
1310
1311
1312   /* Full request not available, skip all currently availbile and request refill for more */
1313   
1314   priv->pos = 0;
1315   priv->end = 0;
1316   
1317   count -= available;
1318   
1319   data->bytes_skipped = available;
1320   data->count = count;
1321
1322   if (count > priv->len)
1323     {
1324       /* Large request, shortcut buffer */
1325       
1326       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1327       
1328       g_input_stream_skip_async (base_stream,
1329                                  count,
1330                                  io_priority, cancellable,
1331                                  large_skip_callback,
1332                                  simple);
1333     }
1334   else
1335     {
1336       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1337       g_buffered_input_stream_fill_async (bstream, priv->len,
1338                                           io_priority, cancellable,
1339                                           skip_fill_buffer_callback, simple);
1340     }
1341 }
1342
1343 static gssize
1344 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1345                                      GAsyncResult   *result,
1346                                      GError        **error)
1347 {
1348   GSimpleAsyncResult *simple;
1349   SkipAsyncData *data;
1350   
1351   simple = G_SIMPLE_ASYNC_RESULT (result);
1352   
1353   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1354
1355   data = g_simple_async_result_get_op_res_gpointer (simple);
1356   
1357   return data->bytes_skipped;
1358 }
1359
1360
1361 #define __G_BUFFERED_INPUT_STREAM_C__
1362 #include "gioaliasdef.c"