Clarify docs for g_memory_output_stream_get_size. Add
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org> 
22  */
23
24 #include <config.h>
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gsimpleasyncresult.h"
28 #include <string.h>
29 #include "glibintl.h"
30
31 #include "gioalias.h"
32
33 /**
34  * SECTION:gbufferedinputstream
35  * @short_description: Buffered Input Stream
36  * @include: gio/gio.h
37  * @see_also: #GFilterInputStream, #GInputStream
38  * 
39  * Buffered input stream implements #GFilterInputStream and provides 
40  * for buffered reads. 
41  * 
42  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
43  * 
44  * To create a buffered input stream, use g_buffered_input_stream_new(), 
45  * or g_buffered_input_stream_new_sized() to specify the buffer's size at 
46  * construction.
47  * 
48  * To get the size of a buffer within a buffered input stream, use 
49  * g_buffered_input_stream_get_buffer_size(). To change the size of a 
50  * buffered input stream's buffer, use
51  * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size 
52  * cannot be reduced below the size of the data within the buffer.
53  *
54  **/
55
56
57
58 #define DEFAULT_BUFFER_SIZE 4096
59
60 struct _GBufferedInputStreamPrivate {
61   guint8 *buffer;
62   gsize   len;
63   gsize   pos;
64   gsize   end;
65   GAsyncReadyCallback outstanding_callback;
66 };
67
68 enum {
69   PROP_0,
70   PROP_BUFSIZE
71 };
72
73 static void g_buffered_input_stream_set_property  (GObject      *object,
74                                                    guint         prop_id,
75                                                    const GValue *value,
76                                                    GParamSpec   *pspec);
77
78 static void g_buffered_input_stream_get_property  (GObject      *object,
79                                                    guint         prop_id,
80                                                    GValue       *value,
81                                                    GParamSpec   *pspec);
82 static void g_buffered_input_stream_finalize      (GObject *object);
83
84
85 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
86                                                         gsize                  count,
87                                                         GCancellable          *cancellable,
88                                                         GError               **error);
89 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
90                                                         gsize                  count,
91                                                         int                    io_priority,
92                                                         GCancellable          *cancellable,
93                                                         GAsyncReadyCallback    callback,
94                                                         gpointer               user_data);
95 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
96                                                         GAsyncResult          *result,
97                                                         GError               **error);
98 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
99                                                         void                  *buffer,
100                                                         gsize                  count,
101                                                         GCancellable          *cancellable,
102                                                         GError               **error);
103 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
104                                                         void                  *buffer,
105                                                         gsize                  count,
106                                                         int                    io_priority,
107                                                         GCancellable          *cancellable,
108                                                         GAsyncReadyCallback    callback,
109                                                         gpointer               user_data);
110 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
111                                                         GAsyncResult          *result,
112                                                         GError               **error);
113 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
114                                                         gssize                 count,
115                                                         GCancellable          *cancellable,
116                                                         GError               **error);
117 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
118                                                         gssize                 count,
119                                                         int                    io_priority,
120                                                         GCancellable          *cancellable,
121                                                         GAsyncReadyCallback    callback,
122                                                         gpointer               user_data);
123 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
124                                                         GAsyncResult          *result,
125                                                         GError               **error);
126
127 static void compact_buffer (GBufferedInputStream *stream);
128
129 G_DEFINE_TYPE (GBufferedInputStream,
130                g_buffered_input_stream,
131                G_TYPE_FILTER_INPUT_STREAM)
132
133
134 static void
135 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
136 {
137   GObjectClass *object_class;
138   GInputStreamClass *istream_class;
139   GBufferedInputStreamClass *bstream_class;
140
141   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
142
143   object_class = G_OBJECT_CLASS (klass);
144   object_class->get_property = g_buffered_input_stream_get_property;
145   object_class->set_property = g_buffered_input_stream_set_property;
146   object_class->finalize     = g_buffered_input_stream_finalize;
147
148   istream_class = G_INPUT_STREAM_CLASS (klass);
149   istream_class->skip = g_buffered_input_stream_skip;
150   istream_class->skip_async  = g_buffered_input_stream_skip_async;
151   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
152   istream_class->read_fn = g_buffered_input_stream_read;
153   istream_class->read_async  = g_buffered_input_stream_read_async;
154   istream_class->read_finish = g_buffered_input_stream_read_finish;
155
156   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
157   bstream_class->fill = g_buffered_input_stream_real_fill;
158   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
159   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
160   
161   g_object_class_install_property (object_class,
162                                    PROP_BUFSIZE,
163                                    g_param_spec_uint ("buffer-size",
164                                                       P_("Buffer Size"),
165                                                       P_("The size of the backend buffer"),
166                                                       1,
167                                                       G_MAXUINT,
168                                                       DEFAULT_BUFFER_SIZE,
169                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
170                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
171
172
173 }
174
175 /**
176  * g_buffered_input_stream_get_buffer_size:
177  * @stream: #GBufferedInputStream.
178  * 
179  * Gets the size of the input buffer.
180  * 
181  * Returns: the current buffer size.
182  **/
183 gsize
184 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
185 {
186   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
187
188   return stream->priv->len;
189 }
190
191 /**
192  * g_buffered_input_stream_set_buffer_size:
193  * @stream: #GBufferedInputStream.
194  * @size: a #gsize.
195  *
196  * Sets the size of the internal buffer of @stream to @size, or to the 
197  * size of the contents of the buffer. The buffer can never be resized 
198  * smaller than its current contents.
199  **/
200 void
201 g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
202                                          gsize                  size)
203 {
204   GBufferedInputStreamPrivate *priv;
205   gsize in_buffer;
206   guint8 *buffer;
207   
208   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
209
210   priv = stream->priv;
211
212   if (priv->len == size)
213     return;
214
215   if (priv->buffer)
216     {
217       in_buffer = priv->end - priv->pos;
218
219       /* Never resize smaller than current buffer contents */
220       size = MAX (size, in_buffer);
221
222       buffer = g_malloc (size);
223       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
224       priv->len = size;
225       priv->pos = 0;
226       priv->end = in_buffer;
227       g_free (priv->buffer);
228       priv->buffer = buffer;
229     }
230   else
231     {
232       priv->len = size;
233       priv->pos = 0;
234       priv->end = 0;
235       priv->buffer = g_malloc (size);
236     }
237
238   g_object_notify (G_OBJECT (stream), "buffer-size");
239 }
240
241 static void
242 g_buffered_input_stream_set_property (GObject      *object,
243                                       guint         prop_id,
244                                       const GValue *value,
245                                       GParamSpec   *pspec)
246 {
247   GBufferedInputStreamPrivate *priv;
248   GBufferedInputStream        *bstream;
249
250   bstream = G_BUFFERED_INPUT_STREAM (object);
251   priv = bstream->priv;
252
253   switch (prop_id) 
254     {
255     case PROP_BUFSIZE:
256       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
257       break;
258
259     default:
260       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
261       break;
262     }
263
264 }
265
266 static void
267 g_buffered_input_stream_get_property (GObject    *object,
268                                       guint       prop_id,
269                                       GValue     *value,
270                                       GParamSpec *pspec)
271 {
272   GBufferedInputStreamPrivate *priv;
273   GBufferedInputStream        *bstream;
274
275   bstream = G_BUFFERED_INPUT_STREAM (object);
276   priv = bstream->priv;
277
278   switch (prop_id)
279     { 
280     case PROP_BUFSIZE:
281       g_value_set_uint (value, priv->len);
282       break;
283
284     default:
285       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
286       break;
287     }
288 }
289
290 static void
291 g_buffered_input_stream_finalize (GObject *object)
292 {
293   GBufferedInputStreamPrivate *priv;
294   GBufferedInputStream        *stream;
295
296   stream = G_BUFFERED_INPUT_STREAM (object);
297   priv = stream->priv;
298
299   g_free (priv->buffer);
300
301   if (G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize)
302     (*G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) (object);
303 }
304
305 static void
306 g_buffered_input_stream_init (GBufferedInputStream *stream)
307 {
308   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
309                                               G_TYPE_BUFFERED_INPUT_STREAM,
310                                               GBufferedInputStreamPrivate);
311 }
312
313
314 /**
315  * g_buffered_input_stream_new:
316  * @base_stream: a #GInputStream.
317  * 
318  * Creates a new #GInputStream from the given @base_stream, with 
319  * a buffer set to the default size (4 kilobytes).
320  *
321  * Returns: a #GInputStream for the given @base_stream.
322  **/
323 GInputStream *
324 g_buffered_input_stream_new (GInputStream *base_stream)
325 {
326   GInputStream *stream;
327
328   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
329
330   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
331                          "base-stream", base_stream,
332                          NULL);
333
334   return stream;
335 }
336
337 /**
338  * g_buffered_input_stream_new_sized:
339  * @base_stream: a #GInputStream.
340  * @size: a #gsize.
341  * 
342  * Creates a new #GBufferedInputStream from the given @base_stream, 
343  * with a buffer set to @size.
344  *
345  * Returns: a #GInputStream.
346  **/
347 GInputStream *
348 g_buffered_input_stream_new_sized (GInputStream *base_stream,
349                                    gsize         size)
350 {
351   GInputStream *stream;
352
353   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
354
355   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
356                          "base-stream", base_stream,
357                          "buffer-size", (guint)size,
358                          NULL);
359
360   return stream;
361 }
362
363 /**
364  * g_buffered_input_stream_fill:
365  * @stream: #GBufferedInputStream.
366  * @count: the number of bytes that will be read from the stream.
367  * @cancellable: optional #GCancellable object, %NULL to ignore.
368  * @error: location to store the error occuring, or %NULL to ignore.
369  *
370  * Tries to read @count bytes from the stream into the buffer. 
371  * Will block during this read.
372  * 
373  * If @count is zero, returns zero and does nothing. A value of @count
374  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
375  *
376  * On success, the number of bytes read into the buffer is returned.
377  * It is not an error if this is not the same as the requested size, as it
378  * can happen e.g. near the end of a file. Zero is returned on end of file
379  * (or if @count is zero),  but never otherwise.
380  *
381  * If @cancellable is not %NULL, then the operation can be cancelled by
382  * triggering the cancellable object from another thread. If the operation
383  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
384  * operation was partially finished when the operation was cancelled the
385  * partial result will be returned, without an error.
386  *
387  * On error -1 is returned and @error is set accordingly.
388  * 
389  * For the asynchronous, non-blocking, version of this function, see 
390  * g_buffered_input_stream_fill_async().
391  *
392  * Returns: the number of bytes read into @stream's buffer, up to @count, 
393  *     or -1 on error.
394  **/
395 gssize
396 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
397                               gssize                 count,
398                               GCancellable          *cancellable,
399                               GError               **error)
400 {
401   GBufferedInputStreamClass *class;
402   GInputStream *input_stream;
403   gssize res;
404
405   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
406   
407   input_stream = G_INPUT_STREAM (stream);
408   
409   if (!g_input_stream_set_pending (input_stream, error))
410     return -1;
411
412   if (cancellable)
413     g_cancellable_push_current (cancellable);
414   
415   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
416   res = class->fill (stream, count, cancellable, error);
417
418   if (cancellable)
419     g_cancellable_pop_current (cancellable);
420   
421   g_input_stream_clear_pending (input_stream);
422   
423   return res;
424 }
425
426 static void
427 async_fill_callback_wrapper (GObject      *source_object,
428                              GAsyncResult *res,
429                              gpointer      user_data)
430 {
431   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
432
433   g_input_stream_clear_pending (G_INPUT_STREAM (stream));
434   (*stream->priv->outstanding_callback) (source_object, res, user_data);
435   g_object_unref (stream);
436 }
437
438 /**
439  * g_buffered_input_stream_fill_async:
440  * @stream: #GBufferedInputStream.
441  * @count: a #gssize.
442  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
443  *     of the request.
444  * @cancellable: optional #GCancellable object
445  * @callback: a #GAsyncReadyCallback.
446  * @user_data: a #gpointer.
447  *
448  * Reads data into @stream's buffer asynchronously, up to @count size.
449  * @io_priority can be used to prioritize reads. For the synchronous
450  * version of this function, see g_buffered_input_stream_fill().
451  **/
452 void
453 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
454                                     gssize                count,
455                                     int                   io_priority,
456                                     GCancellable         *cancellable,
457                                     GAsyncReadyCallback   callback,
458                                     gpointer              user_data)
459 {
460   GBufferedInputStreamClass *class;
461   GSimpleAsyncResult *simple;
462   GError *error = NULL;
463
464   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
465
466   if (count == 0)
467     {
468       simple = g_simple_async_result_new (G_OBJECT (stream),
469                                           callback,
470                                           user_data,
471                                           g_buffered_input_stream_fill_async);
472       g_simple_async_result_complete_in_idle (simple);
473       g_object_unref (simple);
474       return;
475     }
476   
477   if (((gssize) count) < 0)
478     {
479       g_simple_async_report_error_in_idle (G_OBJECT (stream),
480                                            callback,
481                                            user_data,
482                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
483                                            _("Too large count value passed to %s"),
484                                            G_STRFUNC);
485       return;
486     }
487   
488   if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
489     {
490       g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
491                                             callback,
492                                             user_data,
493                                             error);
494       g_error_free (error);
495       return;
496     }
497     
498   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
499   
500   stream->priv->outstanding_callback = callback;
501   g_object_ref (stream);
502   class->fill_async (stream, count, io_priority, cancellable,
503                      async_fill_callback_wrapper, user_data);
504 }
505
506 /**
507  * g_buffered_input_stream_fill_finish:
508  * @stream: a #GBufferedInputStream.
509  * @result: a #GAsyncResult.
510  * @error: a #GError.
511  *
512  * Finishes an asynchronous read.
513  * 
514  * Returns: a #gssize of the read stream, or %-1 on an error. 
515  **/
516 gssize
517 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
518                                      GAsyncResult          *result,
519                                      GError               **error)
520 {
521   GSimpleAsyncResult *simple;
522   GBufferedInputStreamClass *class;
523
524   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
525   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
526
527   if (G_IS_SIMPLE_ASYNC_RESULT (result))
528     {
529       simple = G_SIMPLE_ASYNC_RESULT (result);
530       if (g_simple_async_result_propagate_error (simple, error))
531         return -1;
532
533       /* Special case read of 0 bytes */
534       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
535         return 0;
536     }
537
538   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
539   return class->fill_finish (stream, result, error);
540 }
541
542 /**
543  * g_buffered_input_stream_get_available:
544  * @stream: #GBufferedInputStream.
545  * 
546  * Gets the size of the available data within the stream.
547  * 
548  * Returns: size of the available stream. 
549  **/
550 gsize
551 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
552 {
553   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
554
555   return stream->priv->end - stream->priv->pos;
556 }
557
558 /**
559  * g_buffered_input_stream_peek:
560  * @stream: a #GBufferedInputStream.
561  * @buffer: a pointer to an allocated chunk of memory.
562  * @offset: a #gsize.
563  * @count: a #gsize.
564  * 
565  * Peeks in the buffer, copying data of size @count into @buffer, 
566  * offset @offset bytes.
567  * 
568  * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
569  **/
570 gsize
571 g_buffered_input_stream_peek (GBufferedInputStream *stream,
572                               void                 *buffer,
573                               gsize                 offset,
574                               gsize                 count)
575 {
576   gsize available;
577   gsize end;
578   
579   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
580   g_return_val_if_fail (buffer != NULL, -1);
581
582   available = g_buffered_input_stream_get_available (stream);
583
584   if (offset > available)
585     return 0;
586   
587   end = MIN (offset + count, available);
588   count = end - offset;
589   
590   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
591   return count;
592 }
593
594 /**
595  * g_buffered_input_stream_peek_buffer:
596  * @stream: a #GBufferedInputStream.
597  * @count: a #gsize to get the number of bytes available in the buffer.
598  *
599  * Returns the buffer with the currently available bytes. The returned
600  * buffer must not be modified and will become invalid when reading from
601  * the stream or filling the buffer.
602  *
603  * Returns: read-only buffer
604  **/
605 const void*
606 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
607                                      gsize                *count)
608 {
609   GBufferedInputStreamPrivate *priv;
610
611   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
612
613   priv = stream->priv;
614
615   if (count) 
616     *count = priv->end - priv->pos;
617
618   return priv->buffer + priv->pos;
619 }
620
621 static void
622 compact_buffer (GBufferedInputStream *stream)
623 {
624   GBufferedInputStreamPrivate *priv;
625   gsize current_size;
626
627   priv = stream->priv;
628
629   current_size = priv->end - priv->pos;
630   
631   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
632   
633   priv->pos = 0;
634   priv->end = current_size;
635 }
636
637 static gssize
638 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
639                                    gssize                 count,
640                                    GCancellable          *cancellable,
641                                    GError               **error)
642 {
643   GBufferedInputStreamPrivate *priv;
644   GInputStream *base_stream;
645   gssize nread;
646   gsize in_buffer;
647
648   priv = stream->priv;
649
650   if (count == -1)
651     count = priv->len;
652   
653   in_buffer = priv->end - priv->pos;
654
655   /* Never fill more than can fit in the buffer */
656   count = MIN (count, priv->len - in_buffer);
657
658   /* If requested length does not fit at end, compact */
659   if (priv->len - priv->end < count)
660     compact_buffer (stream);
661
662   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
663   nread = g_input_stream_read (base_stream,
664                                priv->buffer + priv->end,
665                                count,
666                                cancellable,
667                                error);
668
669   if (nread > 0)
670     priv->end += nread;
671   
672   return nread;
673 }
674
675 static gssize
676 g_buffered_input_stream_skip (GInputStream  *stream,
677                               gsize          count,
678                               GCancellable  *cancellable,
679                               GError       **error)
680 {
681   GBufferedInputStream        *bstream;
682   GBufferedInputStreamPrivate *priv;
683   GBufferedInputStreamClass *class;
684   GInputStream *base_stream;
685   gsize available, bytes_skipped;
686   gssize nread;
687
688   bstream = G_BUFFERED_INPUT_STREAM (stream);
689   priv = bstream->priv;
690
691   available = priv->end - priv->pos;
692
693   if (count <= available)
694     {
695       priv->pos += count;
696       return count;
697     }
698
699   /* Full request not available, skip all currently available and 
700    * request refill for more 
701    */
702   
703   priv->pos = 0;
704   priv->end = 0;
705   bytes_skipped = available;
706   count -= available;
707
708   if (bytes_skipped > 0)
709     error = NULL; /* Ignore further errors if we already read some data */
710   
711   if (count > priv->len)
712     {
713       /* Large request, shortcut buffer */
714       
715       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
716
717       nread = g_input_stream_skip (base_stream,
718                                    count,
719                                    cancellable,
720                                    error);
721       
722       if (nread < 0 && bytes_skipped == 0)
723         return -1;
724       
725       if (nread > 0)
726         bytes_skipped += nread;
727       
728       return bytes_skipped;
729     }
730   
731   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
732   nread = class->fill (bstream, priv->len, cancellable, error);
733   
734   if (nread < 0)
735     {
736       if (bytes_skipped == 0)
737         return -1;
738       else
739         return bytes_skipped;
740     }
741   
742   available = priv->end - priv->pos;
743   count = MIN (count, available);
744   
745   bytes_skipped += count;
746   priv->pos += count;
747   
748   return bytes_skipped;
749 }
750
751 static gssize
752 g_buffered_input_stream_read (GInputStream *stream,
753                               void         *buffer,
754                               gsize         count,
755                               GCancellable *cancellable,
756                               GError      **error)
757 {
758   GBufferedInputStream        *bstream;
759   GBufferedInputStreamPrivate *priv;
760   GBufferedInputStreamClass *class;
761   GInputStream *base_stream;
762   gsize available, bytes_read;
763   gssize nread;
764
765   bstream = G_BUFFERED_INPUT_STREAM (stream);
766   priv = bstream->priv;
767
768   available = priv->end - priv->pos;
769
770   if (count <= available)
771     {
772       memcpy (buffer, priv->buffer + priv->pos, count);
773       priv->pos += count;
774       return count;
775     }
776   
777   /* Full request not available, read all currently availbile and request refill for more */
778   
779   memcpy (buffer, priv->buffer + priv->pos, available);
780   priv->pos = 0;
781   priv->end = 0;
782   bytes_read = available;
783   count -= available;
784
785   if (bytes_read > 0)
786     error = NULL; /* Ignore further errors if we already read some data */
787   
788   if (count > priv->len)
789     {
790       /* Large request, shortcut buffer */
791       
792       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
793
794       nread = g_input_stream_read (base_stream,
795                                    (char *)buffer + bytes_read,
796                                    count,
797                                    cancellable,
798                                    error);
799       
800       if (nread < 0 && bytes_read == 0)
801         return -1;
802       
803       if (nread > 0)
804         bytes_read += nread;
805       
806       return bytes_read;
807     }
808   
809   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
810   nread = class->fill (bstream, priv->len, cancellable, error);
811   if (nread < 0)
812     {
813       if (bytes_read == 0)
814         return -1;
815       else
816         return bytes_read;
817     }
818   
819   available = priv->end - priv->pos;
820   count = MIN (count, available);
821   
822   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
823   bytes_read += count;
824   priv->pos += count;
825   
826   return bytes_read;
827 }
828
829 /**
830  * g_buffered_input_stream_read_byte:
831  * @stream: #GBufferedInputStream.
832  * @cancellable: optional #GCancellable object, %NULL to ignore.
833  * @error: location to store the error occuring, or %NULL to ignore.
834  *
835  * Tries to read a single byte from the stream or the buffer. Will block
836  * during this read.
837  *
838  * On success, the byte read from the stream is returned. On end of stream
839  * -1 is returned but it's not an exceptional error and @error is not set.
840  * 
841  * If @cancellable is not %NULL, then the operation can be cancelled by
842  * triggering the cancellable object from another thread. If the operation
843  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
844  * operation was partially finished when the operation was cancelled the
845  * partial result will be returned, without an error.
846  *
847  * On error -1 is returned and @error is set accordingly.
848  * 
849  * Returns: the byte read from the @stream, or -1 on end of stream or error.
850  **/
851 int
852 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
853                                    GCancellable          *cancellable,
854                                    GError               **error)
855 {
856   GBufferedInputStreamPrivate *priv;
857   GBufferedInputStreamClass *class;
858   GInputStream *input_stream;
859   gsize available;
860   gssize nread;
861
862   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
863
864   priv = stream->priv;
865   input_stream = G_INPUT_STREAM (stream);
866
867   if (g_input_stream_is_closed (input_stream))
868     {
869       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
870        _("Stream is already closed"));
871       return -1;
872     }
873
874   if (!g_input_stream_set_pending (input_stream, error))
875     return -1;
876
877   available = priv->end - priv->pos;
878
879   if (available < 1)
880     {
881       g_input_stream_clear_pending (input_stream);
882       return priv->buffer[priv->pos++];
883     }
884
885   /* Byte not available, request refill for more */
886
887   if (cancellable)
888     g_cancellable_push_current (cancellable);
889
890   priv->pos = 0;
891   priv->end = 0;
892
893   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
894   nread = class->fill (stream, priv->len, cancellable, error);
895
896   if (cancellable)
897     g_cancellable_pop_current (cancellable);
898
899   g_input_stream_clear_pending (input_stream);
900
901   if (nread <= 0)
902     return -1; /* error or end of stream */
903
904   return priv->buffer[priv->pos++];
905 }
906
907 /* ************************** */
908 /* Async stuff implementation */
909 /* ************************** */
910
911 static void
912 fill_async_callback (GObject      *source_object,
913                      GAsyncResult *result,
914                      gpointer      user_data)
915 {
916   GError *error;
917   gssize res;
918   GSimpleAsyncResult *simple;
919
920   simple = user_data;
921   
922   error = NULL;
923   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
924                                     result, &error);
925
926   g_simple_async_result_set_op_res_gssize (simple, res);
927   if (res == -1)
928     {
929       g_simple_async_result_set_from_error (simple, error);
930       g_error_free (error);
931     }
932   
933   /* Complete immediately, not in idle, since we're already in a mainloop callout */
934   g_simple_async_result_complete (simple);
935   g_object_unref (simple);
936 }
937
938 static void
939 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
940                                          gssize                count,
941                                          int                   io_priority,
942                                          GCancellable         *cancellable,
943                                          GAsyncReadyCallback   callback,
944                                          gpointer              user_data)
945 {
946   GBufferedInputStreamPrivate *priv;
947   GInputStream *base_stream;
948   GSimpleAsyncResult *simple;
949   gsize in_buffer;
950
951   priv = stream->priv;
952
953   if (count == -1)
954     count = priv->len;
955   
956   in_buffer = priv->end - priv->pos;
957
958   /* Never fill more than can fit in the buffer */
959   count = MIN (count, priv->len - in_buffer);
960
961   /* If requested length does not fit at end, compact */
962   if (priv->len - priv->end < count)
963     compact_buffer (stream);
964
965   simple = g_simple_async_result_new (G_OBJECT (stream),
966                                       callback, user_data,
967                                       g_buffered_input_stream_real_fill_async);
968   
969   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
970   g_input_stream_read_async (base_stream,
971                              priv->buffer + priv->end,
972                              count,
973                              io_priority,
974                              cancellable,
975                              fill_async_callback,
976                              simple);
977 }
978
979 static gssize
980 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
981                                           GAsyncResult         *result,
982                                           GError              **error)
983 {
984   GSimpleAsyncResult *simple;
985   gssize nread;
986
987   simple = G_SIMPLE_ASYNC_RESULT (result);
988   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
989   
990   nread = g_simple_async_result_get_op_res_gssize (simple);
991   return nread;
992 }
993
994 typedef struct {
995   gssize bytes_read;
996   gssize count;
997   void *buffer;
998 } ReadAsyncData;
999
1000 static void 
1001 free_read_async_data (gpointer _data)
1002 {
1003   ReadAsyncData *data = _data;
1004   g_slice_free (ReadAsyncData, data);
1005 }
1006
1007 static void
1008 large_read_callback (GObject *source_object,
1009                      GAsyncResult *result,
1010                      gpointer user_data)
1011 {
1012   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1013   ReadAsyncData *data;
1014   GError *error;
1015   gssize nread;
1016
1017   data = g_simple_async_result_get_op_res_gpointer (simple);
1018   
1019   error = NULL;
1020   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1021                                       result, &error);
1022
1023   /* Only report the error if we've not already read some data */
1024   if (nread < 0 && data->bytes_read == 0)
1025     g_simple_async_result_set_from_error (simple, error);
1026   
1027   if (nread > 0)
1028     data->bytes_read += nread;
1029   
1030   if (error)
1031     g_error_free (error);
1032   
1033   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1034   g_simple_async_result_complete (simple);
1035   g_object_unref (simple);
1036 }
1037
1038 static void
1039 read_fill_buffer_callback (GObject *source_object,
1040                            GAsyncResult *result,
1041                            gpointer user_data)
1042 {
1043   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1044   GBufferedInputStream *bstream;
1045   GBufferedInputStreamPrivate *priv;
1046   ReadAsyncData *data;
1047   GError *error;
1048   gssize nread;
1049   gsize available;
1050
1051   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1052   priv = bstream->priv;
1053   
1054   data = g_simple_async_result_get_op_res_gpointer (simple);
1055   
1056   error = NULL;
1057   nread = g_buffered_input_stream_fill_finish (bstream,
1058                                                result, &error);
1059   
1060   if (nread < 0 && data->bytes_read == 0)
1061     g_simple_async_result_set_from_error (simple, error);
1062
1063
1064   if (nread > 0)
1065     {
1066       available = priv->end - priv->pos;
1067       data->count = MIN (data->count, available);
1068       
1069       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1070       data->bytes_read += data->count;
1071       priv->pos += data->count;
1072     }
1073
1074   if (error)
1075     g_error_free (error);
1076   
1077   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1078   g_simple_async_result_complete (simple);
1079   g_object_unref (simple);
1080 }
1081
1082 static void
1083 g_buffered_input_stream_read_async (GInputStream              *stream,
1084                                     void                      *buffer,
1085                                     gsize                      count,
1086                                     int                        io_priority,
1087                                     GCancellable              *cancellable,
1088                                     GAsyncReadyCallback        callback,
1089                                     gpointer                   user_data)
1090 {
1091   GBufferedInputStream *bstream;
1092   GBufferedInputStreamPrivate *priv;
1093   GBufferedInputStreamClass *class;
1094   GInputStream *base_stream;
1095   gsize available;
1096   GSimpleAsyncResult *simple;
1097   ReadAsyncData *data;
1098
1099   bstream = G_BUFFERED_INPUT_STREAM (stream);
1100   priv = bstream->priv;
1101
1102   data = g_slice_new (ReadAsyncData);
1103   data->buffer = buffer;
1104   data->bytes_read = 0;
1105   simple = g_simple_async_result_new (G_OBJECT (stream),
1106                                       callback, user_data,
1107                                       g_buffered_input_stream_read_async);
1108   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1109   
1110   available = priv->end - priv->pos;
1111   
1112   if (count <= available)
1113     {
1114       memcpy (buffer, priv->buffer + priv->pos, count);
1115       priv->pos += count;
1116       data->bytes_read = count;
1117       
1118       g_simple_async_result_complete_in_idle (simple);
1119       g_object_unref (simple);
1120       return;
1121     }
1122
1123
1124   /* Full request not available, read all currently availbile and request refill for more */
1125   
1126   memcpy (buffer, priv->buffer + priv->pos, available);
1127   priv->pos = 0;
1128   priv->end = 0;
1129   
1130   count -= available;
1131   
1132   data->bytes_read = available;
1133   data->count = count;
1134
1135   if (count > priv->len)
1136     {
1137       /* Large request, shortcut buffer */
1138       
1139       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1140       
1141       g_input_stream_read_async (base_stream,
1142                                  (char *)buffer + data->bytes_read,
1143                                  count,
1144                                  io_priority, cancellable,
1145                                  large_read_callback,
1146                                  simple);
1147     }
1148   else
1149     {
1150       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1151       class->fill_async (bstream, priv->len, io_priority, cancellable,
1152                          read_fill_buffer_callback, simple);
1153     }
1154 }
1155
1156 static gssize
1157 g_buffered_input_stream_read_finish (GInputStream   *stream,
1158                                      GAsyncResult   *result,
1159                                      GError        **error)
1160 {
1161   GSimpleAsyncResult *simple;
1162   ReadAsyncData *data;
1163   
1164   simple = G_SIMPLE_ASYNC_RESULT (result);
1165   
1166   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1167
1168   data = g_simple_async_result_get_op_res_gpointer (simple);
1169   
1170   return data->bytes_read;
1171 }
1172
1173 typedef struct {
1174   gssize bytes_skipped;
1175   gssize count;
1176 } SkipAsyncData;
1177
1178 static void 
1179 free_skip_async_data (gpointer _data)
1180 {
1181   SkipAsyncData *data = _data;
1182   g_slice_free (SkipAsyncData, data);
1183 }
1184
1185 static void
1186 large_skip_callback (GObject *source_object,
1187                      GAsyncResult *result,
1188                      gpointer user_data)
1189 {
1190   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1191   SkipAsyncData *data;
1192   GError *error;
1193   gssize nread;
1194
1195   data = g_simple_async_result_get_op_res_gpointer (simple);
1196   
1197   error = NULL;
1198   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1199                                       result, &error);
1200
1201   /* Only report the error if we've not already read some data */
1202   if (nread < 0 && data->bytes_skipped == 0)
1203     g_simple_async_result_set_from_error (simple, error);
1204   
1205   if (nread > 0)
1206     data->bytes_skipped += nread;
1207   
1208   if (error)
1209     g_error_free (error);
1210   
1211   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1212   g_simple_async_result_complete (simple);
1213   g_object_unref (simple);
1214 }
1215
1216 static void
1217 skip_fill_buffer_callback (GObject *source_object,
1218                            GAsyncResult *result,
1219                            gpointer user_data)
1220 {
1221   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1222   GBufferedInputStream *bstream;
1223   GBufferedInputStreamPrivate *priv;
1224   SkipAsyncData *data;
1225   GError *error;
1226   gssize nread;
1227   gsize available;
1228
1229   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1230   priv = bstream->priv;
1231   
1232   data = g_simple_async_result_get_op_res_gpointer (simple);
1233   
1234   error = NULL;
1235   nread = g_buffered_input_stream_fill_finish (bstream,
1236                                                result, &error);
1237   
1238   if (nread < 0 && data->bytes_skipped == 0)
1239     g_simple_async_result_set_from_error (simple, error);
1240
1241
1242   if (nread > 0)
1243     {
1244       available = priv->end - priv->pos;
1245       data->count = MIN (data->count, available);
1246       
1247       data->bytes_skipped += data->count;
1248       priv->pos += data->count;
1249     }
1250
1251   if (error)
1252     g_error_free (error);
1253   
1254   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1255   g_simple_async_result_complete (simple);
1256   g_object_unref (simple);
1257 }
1258
1259 static void
1260 g_buffered_input_stream_skip_async (GInputStream              *stream,
1261                                     gsize                      count,
1262                                     int                        io_priority,
1263                                     GCancellable              *cancellable,
1264                                     GAsyncReadyCallback        callback,
1265                                     gpointer                   user_data)
1266 {
1267   GBufferedInputStream *bstream;
1268   GBufferedInputStreamPrivate *priv;
1269   GBufferedInputStreamClass *class;
1270   GInputStream *base_stream;
1271   gsize available;
1272   GSimpleAsyncResult *simple;
1273   SkipAsyncData *data;
1274
1275   bstream = G_BUFFERED_INPUT_STREAM (stream);
1276   priv = bstream->priv;
1277
1278   data = g_slice_new (SkipAsyncData);
1279   data->bytes_skipped = 0;
1280   simple = g_simple_async_result_new (G_OBJECT (stream),
1281                                       callback, user_data,
1282                                       g_buffered_input_stream_skip_async);
1283   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1284   
1285   available = priv->end - priv->pos;
1286   
1287   if (count <= available)
1288     {
1289       priv->pos += count;
1290       data->bytes_skipped = count;
1291       
1292       g_simple_async_result_complete_in_idle (simple);
1293       g_object_unref (simple);
1294       return;
1295     }
1296
1297
1298   /* Full request not available, skip all currently availbile and request refill for more */
1299   
1300   priv->pos = 0;
1301   priv->end = 0;
1302   
1303   count -= available;
1304   
1305   data->bytes_skipped = available;
1306   data->count = count;
1307
1308   if (count > priv->len)
1309     {
1310       /* Large request, shortcut buffer */
1311       
1312       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1313       
1314       g_input_stream_skip_async (base_stream,
1315                                  count,
1316                                  io_priority, cancellable,
1317                                  large_skip_callback,
1318                                  simple);
1319     }
1320   else
1321     {
1322       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1323       class->fill_async (bstream, priv->len, io_priority, cancellable,
1324                          skip_fill_buffer_callback, simple);
1325     }
1326 }
1327
1328 static gssize
1329 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1330                                      GAsyncResult   *result,
1331                                      GError        **error)
1332 {
1333   GSimpleAsyncResult *simple;
1334   SkipAsyncData *data;
1335   
1336   simple = G_SIMPLE_ASYNC_RESULT (result);
1337   
1338   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1339
1340   data = g_simple_async_result_get_op_res_gpointer (simple);
1341   
1342   return data->bytes_skipped;
1343 }
1344
1345
1346 #define __G_BUFFERED_INPUT_STREAM_C__
1347 #include "gioaliasdef.c"