Initial commit
[platform/upstream/glib2.0.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org> 
22  */
23
24 #include "config.h"
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gcancellable.h"
28 #include "gasyncresult.h"
29 #include "gsimpleasyncresult.h"
30 #include "gioerror.h"
31 #include <string.h>
32 #include "glibintl.h"
33
34 #include "gioalias.h"
35
36 /**
37  * SECTION:gbufferedinputstream
38  * @short_description: Buffered Input Stream
39  * @include: gio/gio.h
40  * @see_also: #GFilterInputStream, #GInputStream
41  * 
42  * Buffered input stream implements #GFilterInputStream and provides 
43  * for buffered reads. 
44  * 
45  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
46  * 
47  * To create a buffered input stream, use g_buffered_input_stream_new(), 
48  * or g_buffered_input_stream_new_sized() to specify the buffer's size at 
49  * construction.
50  * 
51  * To get the size of a buffer within a buffered input stream, use 
52  * g_buffered_input_stream_get_buffer_size(). To change the size of a 
53  * buffered input stream's buffer, use
54  * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size 
55  * cannot be reduced below the size of the data within the buffer.
56  *
57  **/
58
59
60
61 #define DEFAULT_BUFFER_SIZE 4096
62
63 struct _GBufferedInputStreamPrivate {
64   guint8 *buffer;
65   gsize   len;
66   gsize   pos;
67   gsize   end;
68   GAsyncReadyCallback outstanding_callback;
69 };
70
71 enum {
72   PROP_0,
73   PROP_BUFSIZE
74 };
75
76 static void g_buffered_input_stream_set_property  (GObject      *object,
77                                                    guint         prop_id,
78                                                    const GValue *value,
79                                                    GParamSpec   *pspec);
80
81 static void g_buffered_input_stream_get_property  (GObject      *object,
82                                                    guint         prop_id,
83                                                    GValue       *value,
84                                                    GParamSpec   *pspec);
85 static void g_buffered_input_stream_finalize      (GObject *object);
86
87
88 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
89                                                         gsize                  count,
90                                                         GCancellable          *cancellable,
91                                                         GError               **error);
92 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
93                                                         gsize                  count,
94                                                         int                    io_priority,
95                                                         GCancellable          *cancellable,
96                                                         GAsyncReadyCallback    callback,
97                                                         gpointer               user_data);
98 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
99                                                         GAsyncResult          *result,
100                                                         GError               **error);
101 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
102                                                         void                  *buffer,
103                                                         gsize                  count,
104                                                         GCancellable          *cancellable,
105                                                         GError               **error);
106 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
107                                                         void                  *buffer,
108                                                         gsize                  count,
109                                                         int                    io_priority,
110                                                         GCancellable          *cancellable,
111                                                         GAsyncReadyCallback    callback,
112                                                         gpointer               user_data);
113 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
114                                                         GAsyncResult          *result,
115                                                         GError               **error);
116 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
117                                                         gssize                 count,
118                                                         GCancellable          *cancellable,
119                                                         GError               **error);
120 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
121                                                         gssize                 count,
122                                                         int                    io_priority,
123                                                         GCancellable          *cancellable,
124                                                         GAsyncReadyCallback    callback,
125                                                         gpointer               user_data);
126 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
127                                                         GAsyncResult          *result,
128                                                         GError               **error);
129
130 static void compact_buffer (GBufferedInputStream *stream);
131
132 G_DEFINE_TYPE (GBufferedInputStream,
133                g_buffered_input_stream,
134                G_TYPE_FILTER_INPUT_STREAM)
135
136
137 static void
138 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
139 {
140   GObjectClass *object_class;
141   GInputStreamClass *istream_class;
142   GBufferedInputStreamClass *bstream_class;
143
144   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
145
146   object_class = G_OBJECT_CLASS (klass);
147   object_class->get_property = g_buffered_input_stream_get_property;
148   object_class->set_property = g_buffered_input_stream_set_property;
149   object_class->finalize     = g_buffered_input_stream_finalize;
150
151   istream_class = G_INPUT_STREAM_CLASS (klass);
152   istream_class->skip = g_buffered_input_stream_skip;
153   istream_class->skip_async  = g_buffered_input_stream_skip_async;
154   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
155   istream_class->read_fn = g_buffered_input_stream_read;
156   istream_class->read_async  = g_buffered_input_stream_read_async;
157   istream_class->read_finish = g_buffered_input_stream_read_finish;
158
159   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
160   bstream_class->fill = g_buffered_input_stream_real_fill;
161   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
162   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
163   
164   g_object_class_install_property (object_class,
165                                    PROP_BUFSIZE,
166                                    g_param_spec_uint ("buffer-size",
167                                                       P_("Buffer Size"),
168                                                       P_("The size of the backend buffer"),
169                                                       1,
170                                                       G_MAXUINT,
171                                                       DEFAULT_BUFFER_SIZE,
172                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
173                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
174
175
176 }
177
178 /**
179  * g_buffered_input_stream_get_buffer_size:
180  * @stream: #GBufferedInputStream.
181  * 
182  * Gets the size of the input buffer.
183  * 
184  * Returns: the current buffer size.
185  **/
186 gsize
187 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
188 {
189   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
190
191   return stream->priv->len;
192 }
193
194 /**
195  * g_buffered_input_stream_set_buffer_size:
196  * @stream: #GBufferedInputStream.
197  * @size: a #gsize.
198  *
199  * Sets the size of the internal buffer of @stream to @size, or to the 
200  * size of the contents of the buffer. The buffer can never be resized 
201  * smaller than its current contents.
202  **/
203 void
204 g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
205                                          gsize                  size)
206 {
207   GBufferedInputStreamPrivate *priv;
208   gsize in_buffer;
209   guint8 *buffer;
210   
211   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
212
213   priv = stream->priv;
214
215   if (priv->len == size)
216     return;
217
218   if (priv->buffer)
219     {
220       in_buffer = priv->end - priv->pos;
221
222       /* Never resize smaller than current buffer contents */
223       size = MAX (size, in_buffer);
224
225       buffer = g_malloc (size);
226       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
227       priv->len = size;
228       priv->pos = 0;
229       priv->end = in_buffer;
230       g_free (priv->buffer);
231       priv->buffer = buffer;
232     }
233   else
234     {
235       priv->len = size;
236       priv->pos = 0;
237       priv->end = 0;
238       priv->buffer = g_malloc (size);
239     }
240
241   g_object_notify (G_OBJECT (stream), "buffer-size");
242 }
243
244 static void
245 g_buffered_input_stream_set_property (GObject      *object,
246                                       guint         prop_id,
247                                       const GValue *value,
248                                       GParamSpec   *pspec)
249 {
250   GBufferedInputStream        *bstream;
251
252   bstream = G_BUFFERED_INPUT_STREAM (object);
253
254   switch (prop_id)
255     {
256     case PROP_BUFSIZE:
257       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
258       break;
259
260     default:
261       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
262       break;
263     }
264
265 }
266
267 static void
268 g_buffered_input_stream_get_property (GObject    *object,
269                                       guint       prop_id,
270                                       GValue     *value,
271                                       GParamSpec *pspec)
272 {
273   GBufferedInputStreamPrivate *priv;
274   GBufferedInputStream        *bstream;
275
276   bstream = G_BUFFERED_INPUT_STREAM (object);
277   priv = bstream->priv;
278
279   switch (prop_id)
280     { 
281     case PROP_BUFSIZE:
282       g_value_set_uint (value, priv->len);
283       break;
284
285     default:
286       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
287       break;
288     }
289 }
290
291 static void
292 g_buffered_input_stream_finalize (GObject *object)
293 {
294   GBufferedInputStreamPrivate *priv;
295   GBufferedInputStream        *stream;
296
297   stream = G_BUFFERED_INPUT_STREAM (object);
298   priv = stream->priv;
299
300   g_free (priv->buffer);
301
302   G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
303 }
304
305 static void
306 g_buffered_input_stream_init (GBufferedInputStream *stream)
307 {
308   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
309                                               G_TYPE_BUFFERED_INPUT_STREAM,
310                                               GBufferedInputStreamPrivate);
311 }
312
313
314 /**
315  * g_buffered_input_stream_new:
316  * @base_stream: a #GInputStream.
317  * 
318  * Creates a new #GInputStream from the given @base_stream, with 
319  * a buffer set to the default size (4 kilobytes).
320  *
321  * Returns: a #GInputStream for the given @base_stream.
322  **/
323 GInputStream *
324 g_buffered_input_stream_new (GInputStream *base_stream)
325 {
326   GInputStream *stream;
327
328   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
329
330   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
331                          "base-stream", base_stream,
332                          NULL);
333
334   return stream;
335 }
336
337 /**
338  * g_buffered_input_stream_new_sized:
339  * @base_stream: a #GInputStream.
340  * @size: a #gsize.
341  * 
342  * Creates a new #GBufferedInputStream from the given @base_stream, 
343  * with a buffer set to @size.
344  *
345  * Returns: a #GInputStream.
346  **/
347 GInputStream *
348 g_buffered_input_stream_new_sized (GInputStream *base_stream,
349                                    gsize         size)
350 {
351   GInputStream *stream;
352
353   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
354
355   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
356                          "base-stream", base_stream,
357                          "buffer-size", (guint)size,
358                          NULL);
359
360   return stream;
361 }
362
363 /**
364  * g_buffered_input_stream_fill:
365  * @stream: #GBufferedInputStream.
366  * @count: the number of bytes that will be read from the stream.
367  * @cancellable: optional #GCancellable object, %NULL to ignore.
368  * @error: location to store the error occuring, or %NULL to ignore.
369  *
370  * Tries to read @count bytes from the stream into the buffer. 
371  * Will block during this read.
372  * 
373  * If @count is zero, returns zero and does nothing. A value of @count
374  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
375  *
376  * On success, the number of bytes read into the buffer is returned.
377  * It is not an error if this is not the same as the requested size, as it
378  * can happen e.g. near the end of a file. Zero is returned on end of file
379  * (or if @count is zero),  but never otherwise.
380  *
381  * If @count is -1 then the attempted read size is equal to the number of
382  * bytes that are required to fill the buffer.
383  *
384  * If @cancellable is not %NULL, then the operation can be cancelled by
385  * triggering the cancellable object from another thread. If the operation
386  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
387  * operation was partially finished when the operation was cancelled the
388  * partial result will be returned, without an error.
389  *
390  * On error -1 is returned and @error is set accordingly.
391  * 
392  * For the asynchronous, non-blocking, version of this function, see 
393  * g_buffered_input_stream_fill_async().
394  *
395  * Returns: the number of bytes read into @stream's buffer, up to @count, 
396  *     or -1 on error.
397  **/
398 gssize
399 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
400                               gssize                 count,
401                               GCancellable          *cancellable,
402                               GError               **error)
403 {
404   GBufferedInputStreamClass *class;
405   GInputStream *input_stream;
406   gssize res;
407
408   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
409   
410   input_stream = G_INPUT_STREAM (stream);
411   
412   if (count < -1)
413     {
414       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
415                    _("Too large count value passed to %s"), G_STRFUNC);
416       return -1;
417     }
418
419   if (!g_input_stream_set_pending (input_stream, error))
420     return -1;
421
422   if (cancellable)
423     g_cancellable_push_current (cancellable);
424   
425   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
426   res = class->fill (stream, count, cancellable, error);
427
428   if (cancellable)
429     g_cancellable_pop_current (cancellable);
430   
431   g_input_stream_clear_pending (input_stream);
432   
433   return res;
434 }
435
436 static void
437 async_fill_callback_wrapper (GObject      *source_object,
438                              GAsyncResult *res,
439                              gpointer      user_data)
440 {
441   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
442
443   g_input_stream_clear_pending (G_INPUT_STREAM (stream));
444   (*stream->priv->outstanding_callback) (source_object, res, user_data);
445   g_object_unref (stream);
446 }
447
448 /**
449  * g_buffered_input_stream_fill_async:
450  * @stream: #GBufferedInputStream.
451  * @count: the number of bytes that will be read from the stream.
452  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
453  *     of the request.
454  * @cancellable: optional #GCancellable object
455  * @callback: a #GAsyncReadyCallback.
456  * @user_data: a #gpointer.
457  *
458  * Reads data into @stream's buffer asynchronously, up to @count size.
459  * @io_priority can be used to prioritize reads. For the synchronous
460  * version of this function, see g_buffered_input_stream_fill().
461  *
462  * If @count is -1 then the attempted read size is equal to the number
463  * of bytes that are required to fill the buffer.
464  **/
465 void
466 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
467                                     gssize                count,
468                                     int                   io_priority,
469                                     GCancellable         *cancellable,
470                                     GAsyncReadyCallback   callback,
471                                     gpointer              user_data)
472 {
473   GBufferedInputStreamClass *class;
474   GSimpleAsyncResult *simple;
475   GError *error = NULL;
476
477   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
478
479   if (count == 0)
480     {
481       simple = g_simple_async_result_new (G_OBJECT (stream),
482                                           callback,
483                                           user_data,
484                                           g_buffered_input_stream_fill_async);
485       g_simple_async_result_complete_in_idle (simple);
486       g_object_unref (simple);
487       return;
488     }
489   
490   if (count < -1)
491     {
492       g_simple_async_report_error_in_idle (G_OBJECT (stream),
493                                            callback,
494                                            user_data,
495                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
496                                            _("Too large count value passed to %s"),
497                                            G_STRFUNC);
498       return;
499     }
500   
501   if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
502     {
503       g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
504                                             callback,
505                                             user_data,
506                                             error);
507       g_error_free (error);
508       return;
509     }
510     
511   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
512   
513   stream->priv->outstanding_callback = callback;
514   g_object_ref (stream);
515   class->fill_async (stream, count, io_priority, cancellable,
516                      async_fill_callback_wrapper, user_data);
517 }
518
519 /**
520  * g_buffered_input_stream_fill_finish:
521  * @stream: a #GBufferedInputStream.
522  * @result: a #GAsyncResult.
523  * @error: a #GError.
524  *
525  * Finishes an asynchronous read.
526  * 
527  * Returns: a #gssize of the read stream, or %-1 on an error. 
528  **/
529 gssize
530 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
531                                      GAsyncResult          *result,
532                                      GError               **error)
533 {
534   GSimpleAsyncResult *simple;
535   GBufferedInputStreamClass *class;
536
537   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
538   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
539
540   if (G_IS_SIMPLE_ASYNC_RESULT (result))
541     {
542       simple = G_SIMPLE_ASYNC_RESULT (result);
543       if (g_simple_async_result_propagate_error (simple, error))
544         return -1;
545
546       /* Special case read of 0 bytes */
547       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
548         return 0;
549     }
550
551   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
552   return class->fill_finish (stream, result, error);
553 }
554
555 /**
556  * g_buffered_input_stream_get_available:
557  * @stream: #GBufferedInputStream.
558  * 
559  * Gets the size of the available data within the stream.
560  * 
561  * Returns: size of the available stream. 
562  **/
563 gsize
564 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
565 {
566   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
567
568   return stream->priv->end - stream->priv->pos;
569 }
570
571 /**
572  * g_buffered_input_stream_peek:
573  * @stream: a #GBufferedInputStream.
574  * @buffer: a pointer to an allocated chunk of memory.
575  * @offset: a #gsize.
576  * @count: a #gsize.
577  * 
578  * Peeks in the buffer, copying data of size @count into @buffer, 
579  * offset @offset bytes.
580  * 
581  * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
582  **/
583 gsize
584 g_buffered_input_stream_peek (GBufferedInputStream *stream,
585                               void                 *buffer,
586                               gsize                 offset,
587                               gsize                 count)
588 {
589   gsize available;
590   gsize end;
591   
592   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
593   g_return_val_if_fail (buffer != NULL, -1);
594
595   available = g_buffered_input_stream_get_available (stream);
596
597   if (offset > available)
598     return 0;
599   
600   end = MIN (offset + count, available);
601   count = end - offset;
602   
603   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
604   return count;
605 }
606
607 /**
608  * g_buffered_input_stream_peek_buffer:
609  * @stream: a #GBufferedInputStream.
610  * @count: a #gsize to get the number of bytes available in the buffer.
611  *
612  * Returns the buffer with the currently available bytes. The returned
613  * buffer must not be modified and will become invalid when reading from
614  * the stream or filling the buffer.
615  *
616  * Returns: read-only buffer
617  **/
618 const void*
619 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
620                                      gsize                *count)
621 {
622   GBufferedInputStreamPrivate *priv;
623
624   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
625
626   priv = stream->priv;
627
628   if (count) 
629     *count = priv->end - priv->pos;
630
631   return priv->buffer + priv->pos;
632 }
633
634 static void
635 compact_buffer (GBufferedInputStream *stream)
636 {
637   GBufferedInputStreamPrivate *priv;
638   gsize current_size;
639
640   priv = stream->priv;
641
642   current_size = priv->end - priv->pos;
643   
644   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
645   
646   priv->pos = 0;
647   priv->end = current_size;
648 }
649
650 static gssize
651 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
652                                    gssize                 count,
653                                    GCancellable          *cancellable,
654                                    GError               **error)
655 {
656   GBufferedInputStreamPrivate *priv;
657   GInputStream *base_stream;
658   gssize nread;
659   gsize in_buffer;
660
661   priv = stream->priv;
662
663   if (count == -1)
664     count = priv->len;
665   
666   in_buffer = priv->end - priv->pos;
667
668   /* Never fill more than can fit in the buffer */
669   count = MIN (count, priv->len - in_buffer);
670
671   /* If requested length does not fit at end, compact */
672   if (priv->len - priv->end < count)
673     compact_buffer (stream);
674
675   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
676   nread = g_input_stream_read (base_stream,
677                                priv->buffer + priv->end,
678                                count,
679                                cancellable,
680                                error);
681
682   if (nread > 0)
683     priv->end += nread;
684   
685   return nread;
686 }
687
688 static gssize
689 g_buffered_input_stream_skip (GInputStream  *stream,
690                               gsize          count,
691                               GCancellable  *cancellable,
692                               GError       **error)
693 {
694   GBufferedInputStream        *bstream;
695   GBufferedInputStreamPrivate *priv;
696   GBufferedInputStreamClass *class;
697   GInputStream *base_stream;
698   gsize available, bytes_skipped;
699   gssize nread;
700
701   bstream = G_BUFFERED_INPUT_STREAM (stream);
702   priv = bstream->priv;
703
704   available = priv->end - priv->pos;
705
706   if (count <= available)
707     {
708       priv->pos += count;
709       return count;
710     }
711
712   /* Full request not available, skip all currently available and 
713    * request refill for more 
714    */
715   
716   priv->pos = 0;
717   priv->end = 0;
718   bytes_skipped = available;
719   count -= available;
720
721   if (bytes_skipped > 0)
722     error = NULL; /* Ignore further errors if we already read some data */
723   
724   if (count > priv->len)
725     {
726       /* Large request, shortcut buffer */
727       
728       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
729
730       nread = g_input_stream_skip (base_stream,
731                                    count,
732                                    cancellable,
733                                    error);
734       
735       if (nread < 0 && bytes_skipped == 0)
736         return -1;
737       
738       if (nread > 0)
739         bytes_skipped += nread;
740       
741       return bytes_skipped;
742     }
743   
744   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
745   nread = class->fill (bstream, priv->len, cancellable, error);
746   
747   if (nread < 0)
748     {
749       if (bytes_skipped == 0)
750         return -1;
751       else
752         return bytes_skipped;
753     }
754   
755   available = priv->end - priv->pos;
756   count = MIN (count, available);
757   
758   bytes_skipped += count;
759   priv->pos += count;
760   
761   return bytes_skipped;
762 }
763
764 static gssize
765 g_buffered_input_stream_read (GInputStream *stream,
766                               void         *buffer,
767                               gsize         count,
768                               GCancellable *cancellable,
769                               GError      **error)
770 {
771   GBufferedInputStream        *bstream;
772   GBufferedInputStreamPrivate *priv;
773   GBufferedInputStreamClass *class;
774   GInputStream *base_stream;
775   gsize available, bytes_read;
776   gssize nread;
777
778   bstream = G_BUFFERED_INPUT_STREAM (stream);
779   priv = bstream->priv;
780
781   available = priv->end - priv->pos;
782
783   if (count <= available)
784     {
785       memcpy (buffer, priv->buffer + priv->pos, count);
786       priv->pos += count;
787       return count;
788     }
789   
790   /* Full request not available, read all currently availbile and request refill for more */
791   
792   memcpy (buffer, priv->buffer + priv->pos, available);
793   priv->pos = 0;
794   priv->end = 0;
795   bytes_read = available;
796   count -= available;
797
798   if (bytes_read > 0)
799     error = NULL; /* Ignore further errors if we already read some data */
800   
801   if (count > priv->len)
802     {
803       /* Large request, shortcut buffer */
804       
805       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
806
807       nread = g_input_stream_read (base_stream,
808                                    (char *)buffer + bytes_read,
809                                    count,
810                                    cancellable,
811                                    error);
812       
813       if (nread < 0 && bytes_read == 0)
814         return -1;
815       
816       if (nread > 0)
817         bytes_read += nread;
818       
819       return bytes_read;
820     }
821   
822   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
823   nread = class->fill (bstream, priv->len, cancellable, error);
824   if (nread < 0)
825     {
826       if (bytes_read == 0)
827         return -1;
828       else
829         return bytes_read;
830     }
831   
832   available = priv->end - priv->pos;
833   count = MIN (count, available);
834   
835   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
836   bytes_read += count;
837   priv->pos += count;
838   
839   return bytes_read;
840 }
841
842 /**
843  * g_buffered_input_stream_read_byte:
844  * @stream: #GBufferedInputStream.
845  * @cancellable: optional #GCancellable object, %NULL to ignore.
846  * @error: location to store the error occuring, or %NULL to ignore.
847  *
848  * Tries to read a single byte from the stream or the buffer. Will block
849  * during this read.
850  *
851  * On success, the byte read from the stream is returned. On end of stream
852  * -1 is returned but it's not an exceptional error and @error is not set.
853  * 
854  * If @cancellable is not %NULL, then the operation can be cancelled by
855  * triggering the cancellable object from another thread. If the operation
856  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
857  * operation was partially finished when the operation was cancelled the
858  * partial result will be returned, without an error.
859  *
860  * On error -1 is returned and @error is set accordingly.
861  * 
862  * Returns: the byte read from the @stream, or -1 on end of stream or error.
863  **/
864 int
865 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
866                                    GCancellable          *cancellable,
867                                    GError               **error)
868 {
869   GBufferedInputStreamPrivate *priv;
870   GBufferedInputStreamClass *class;
871   GInputStream *input_stream;
872   gsize available;
873   gssize nread;
874
875   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
876
877   priv = stream->priv;
878   input_stream = G_INPUT_STREAM (stream);
879
880   if (g_input_stream_is_closed (input_stream))
881     {
882       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
883                            _("Stream is already closed"));
884       return -1;
885     }
886
887   if (!g_input_stream_set_pending (input_stream, error))
888     return -1;
889
890   available = priv->end - priv->pos;
891
892   if (available != 0)
893     {
894       g_input_stream_clear_pending (input_stream);
895       return priv->buffer[priv->pos++];
896     }
897
898   /* Byte not available, request refill for more */
899
900   if (cancellable)
901     g_cancellable_push_current (cancellable);
902
903   priv->pos = 0;
904   priv->end = 0;
905
906   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
907   nread = class->fill (stream, priv->len, cancellable, error);
908
909   if (cancellable)
910     g_cancellable_pop_current (cancellable);
911
912   g_input_stream_clear_pending (input_stream);
913
914   if (nread <= 0)
915     return -1; /* error or end of stream */
916
917   return priv->buffer[priv->pos++];
918 }
919
920 /* ************************** */
921 /* Async stuff implementation */
922 /* ************************** */
923
924 static void
925 fill_async_callback (GObject      *source_object,
926                      GAsyncResult *result,
927                      gpointer      user_data)
928 {
929   GError *error;
930   gssize res;
931   GSimpleAsyncResult *simple;
932
933   simple = user_data;
934   
935   error = NULL;
936   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
937                                     result, &error);
938
939   g_simple_async_result_set_op_res_gssize (simple, res);
940   if (res == -1)
941     {
942       g_simple_async_result_set_from_error (simple, error);
943       g_error_free (error);
944     }
945   else
946     {
947       GBufferedInputStreamPrivate *priv;
948       GObject *object;
949
950       object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
951       priv = G_BUFFERED_INPUT_STREAM (object)->priv;
952
953       g_assert_cmpint (priv->end + res, <=, priv->len);
954       priv->end += res;
955
956       g_object_unref (object);
957     }
958   
959   /* Complete immediately, not in idle, since we're already in a mainloop callout */
960   g_simple_async_result_complete (simple);
961   g_object_unref (simple);
962 }
963
964 static void
965 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
966                                          gssize                count,
967                                          int                   io_priority,
968                                          GCancellable         *cancellable,
969                                          GAsyncReadyCallback   callback,
970                                          gpointer              user_data)
971 {
972   GBufferedInputStreamPrivate *priv;
973   GInputStream *base_stream;
974   GSimpleAsyncResult *simple;
975   gsize in_buffer;
976
977   priv = stream->priv;
978
979   if (count == -1)
980     count = priv->len;
981   
982   in_buffer = priv->end - priv->pos;
983
984   /* Never fill more than can fit in the buffer */
985   count = MIN (count, priv->len - in_buffer);
986
987   /* If requested length does not fit at end, compact */
988   if (priv->len - priv->end < count)
989     compact_buffer (stream);
990
991   simple = g_simple_async_result_new (G_OBJECT (stream),
992                                       callback, user_data,
993                                       g_buffered_input_stream_real_fill_async);
994   
995   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
996   g_input_stream_read_async (base_stream,
997                              priv->buffer + priv->end,
998                              count,
999                              io_priority,
1000                              cancellable,
1001                              fill_async_callback,
1002                              simple);
1003 }
1004
1005 static gssize
1006 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
1007                                           GAsyncResult         *result,
1008                                           GError              **error)
1009 {
1010   GSimpleAsyncResult *simple;
1011   gssize nread;
1012
1013   simple = G_SIMPLE_ASYNC_RESULT (result);
1014   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1015   
1016   nread = g_simple_async_result_get_op_res_gssize (simple);
1017   return nread;
1018 }
1019
1020 typedef struct {
1021   gssize bytes_read;
1022   gssize count;
1023   void *buffer;
1024 } ReadAsyncData;
1025
1026 static void 
1027 free_read_async_data (gpointer _data)
1028 {
1029   ReadAsyncData *data = _data;
1030   g_slice_free (ReadAsyncData, data);
1031 }
1032
1033 static void
1034 large_read_callback (GObject *source_object,
1035                      GAsyncResult *result,
1036                      gpointer user_data)
1037 {
1038   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1039   ReadAsyncData *data;
1040   GError *error;
1041   gssize nread;
1042
1043   data = g_simple_async_result_get_op_res_gpointer (simple);
1044   
1045   error = NULL;
1046   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1047                                       result, &error);
1048
1049   /* Only report the error if we've not already read some data */
1050   if (nread < 0 && data->bytes_read == 0)
1051     g_simple_async_result_set_from_error (simple, error);
1052   
1053   if (nread > 0)
1054     data->bytes_read += nread;
1055   
1056   if (error)
1057     g_error_free (error);
1058   
1059   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1060   g_simple_async_result_complete (simple);
1061   g_object_unref (simple);
1062 }
1063
1064 static void
1065 read_fill_buffer_callback (GObject *source_object,
1066                            GAsyncResult *result,
1067                            gpointer user_data)
1068 {
1069   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1070   GBufferedInputStream *bstream;
1071   GBufferedInputStreamPrivate *priv;
1072   ReadAsyncData *data;
1073   GError *error;
1074   gssize nread;
1075   gsize available;
1076
1077   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1078   priv = bstream->priv;
1079   
1080   data = g_simple_async_result_get_op_res_gpointer (simple);
1081   
1082   error = NULL;
1083   nread = g_buffered_input_stream_fill_finish (bstream,
1084                                                result, &error);
1085   
1086   if (nread < 0 && data->bytes_read == 0)
1087     g_simple_async_result_set_from_error (simple, error);
1088
1089
1090   if (nread > 0)
1091     {
1092       available = priv->end - priv->pos;
1093       data->count = MIN (data->count, available);
1094       
1095       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1096       data->bytes_read += data->count;
1097       priv->pos += data->count;
1098     }
1099
1100   if (error)
1101     g_error_free (error);
1102   
1103   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1104   g_simple_async_result_complete (simple);
1105   g_object_unref (simple);
1106 }
1107
1108 static void
1109 g_buffered_input_stream_read_async (GInputStream              *stream,
1110                                     void                      *buffer,
1111                                     gsize                      count,
1112                                     int                        io_priority,
1113                                     GCancellable              *cancellable,
1114                                     GAsyncReadyCallback        callback,
1115                                     gpointer                   user_data)
1116 {
1117   GBufferedInputStream *bstream;
1118   GBufferedInputStreamPrivate *priv;
1119   GBufferedInputStreamClass *class;
1120   GInputStream *base_stream;
1121   gsize available;
1122   GSimpleAsyncResult *simple;
1123   ReadAsyncData *data;
1124
1125   bstream = G_BUFFERED_INPUT_STREAM (stream);
1126   priv = bstream->priv;
1127
1128   data = g_slice_new (ReadAsyncData);
1129   data->buffer = buffer;
1130   data->bytes_read = 0;
1131   simple = g_simple_async_result_new (G_OBJECT (stream),
1132                                       callback, user_data,
1133                                       g_buffered_input_stream_read_async);
1134   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1135   
1136   available = priv->end - priv->pos;
1137   
1138   if (count <= available)
1139     {
1140       memcpy (buffer, priv->buffer + priv->pos, count);
1141       priv->pos += count;
1142       data->bytes_read = count;
1143       
1144       g_simple_async_result_complete_in_idle (simple);
1145       g_object_unref (simple);
1146       return;
1147     }
1148
1149
1150   /* Full request not available, read all currently availbile and request refill for more */
1151   
1152   memcpy (buffer, priv->buffer + priv->pos, available);
1153   priv->pos = 0;
1154   priv->end = 0;
1155   
1156   count -= available;
1157   
1158   data->bytes_read = available;
1159   data->count = count;
1160
1161   if (count > priv->len)
1162     {
1163       /* Large request, shortcut buffer */
1164       
1165       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1166       
1167       g_input_stream_read_async (base_stream,
1168                                  (char *)buffer + data->bytes_read,
1169                                  count,
1170                                  io_priority, cancellable,
1171                                  large_read_callback,
1172                                  simple);
1173     }
1174   else
1175     {
1176       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1177       class->fill_async (bstream, priv->len, io_priority, cancellable,
1178                          read_fill_buffer_callback, simple);
1179     }
1180 }
1181
1182 static gssize
1183 g_buffered_input_stream_read_finish (GInputStream   *stream,
1184                                      GAsyncResult   *result,
1185                                      GError        **error)
1186 {
1187   GSimpleAsyncResult *simple;
1188   ReadAsyncData *data;
1189   
1190   simple = G_SIMPLE_ASYNC_RESULT (result);
1191   
1192   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1193
1194   data = g_simple_async_result_get_op_res_gpointer (simple);
1195   
1196   return data->bytes_read;
1197 }
1198
1199 typedef struct {
1200   gssize bytes_skipped;
1201   gssize count;
1202 } SkipAsyncData;
1203
1204 static void 
1205 free_skip_async_data (gpointer _data)
1206 {
1207   SkipAsyncData *data = _data;
1208   g_slice_free (SkipAsyncData, data);
1209 }
1210
1211 static void
1212 large_skip_callback (GObject *source_object,
1213                      GAsyncResult *result,
1214                      gpointer user_data)
1215 {
1216   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1217   SkipAsyncData *data;
1218   GError *error;
1219   gssize nread;
1220
1221   data = g_simple_async_result_get_op_res_gpointer (simple);
1222   
1223   error = NULL;
1224   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1225                                       result, &error);
1226
1227   /* Only report the error if we've not already read some data */
1228   if (nread < 0 && data->bytes_skipped == 0)
1229     g_simple_async_result_set_from_error (simple, error);
1230   
1231   if (nread > 0)
1232     data->bytes_skipped += nread;
1233   
1234   if (error)
1235     g_error_free (error);
1236   
1237   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1238   g_simple_async_result_complete (simple);
1239   g_object_unref (simple);
1240 }
1241
1242 static void
1243 skip_fill_buffer_callback (GObject *source_object,
1244                            GAsyncResult *result,
1245                            gpointer user_data)
1246 {
1247   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1248   GBufferedInputStream *bstream;
1249   GBufferedInputStreamPrivate *priv;
1250   SkipAsyncData *data;
1251   GError *error;
1252   gssize nread;
1253   gsize available;
1254
1255   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1256   priv = bstream->priv;
1257   
1258   data = g_simple_async_result_get_op_res_gpointer (simple);
1259   
1260   error = NULL;
1261   nread = g_buffered_input_stream_fill_finish (bstream,
1262                                                result, &error);
1263   
1264   if (nread < 0 && data->bytes_skipped == 0)
1265     g_simple_async_result_set_from_error (simple, error);
1266
1267
1268   if (nread > 0)
1269     {
1270       available = priv->end - priv->pos;
1271       data->count = MIN (data->count, available);
1272       
1273       data->bytes_skipped += data->count;
1274       priv->pos += data->count;
1275     }
1276
1277   if (error)
1278     g_error_free (error);
1279   
1280   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1281   g_simple_async_result_complete (simple);
1282   g_object_unref (simple);
1283 }
1284
1285 static void
1286 g_buffered_input_stream_skip_async (GInputStream              *stream,
1287                                     gsize                      count,
1288                                     int                        io_priority,
1289                                     GCancellable              *cancellable,
1290                                     GAsyncReadyCallback        callback,
1291                                     gpointer                   user_data)
1292 {
1293   GBufferedInputStream *bstream;
1294   GBufferedInputStreamPrivate *priv;
1295   GBufferedInputStreamClass *class;
1296   GInputStream *base_stream;
1297   gsize available;
1298   GSimpleAsyncResult *simple;
1299   SkipAsyncData *data;
1300
1301   bstream = G_BUFFERED_INPUT_STREAM (stream);
1302   priv = bstream->priv;
1303
1304   data = g_slice_new (SkipAsyncData);
1305   data->bytes_skipped = 0;
1306   simple = g_simple_async_result_new (G_OBJECT (stream),
1307                                       callback, user_data,
1308                                       g_buffered_input_stream_skip_async);
1309   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1310   
1311   available = priv->end - priv->pos;
1312   
1313   if (count <= available)
1314     {
1315       priv->pos += count;
1316       data->bytes_skipped = count;
1317       
1318       g_simple_async_result_complete_in_idle (simple);
1319       g_object_unref (simple);
1320       return;
1321     }
1322
1323
1324   /* Full request not available, skip all currently availbile and request refill for more */
1325   
1326   priv->pos = 0;
1327   priv->end = 0;
1328   
1329   count -= available;
1330   
1331   data->bytes_skipped = available;
1332   data->count = count;
1333
1334   if (count > priv->len)
1335     {
1336       /* Large request, shortcut buffer */
1337       
1338       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1339       
1340       g_input_stream_skip_async (base_stream,
1341                                  count,
1342                                  io_priority, cancellable,
1343                                  large_skip_callback,
1344                                  simple);
1345     }
1346   else
1347     {
1348       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1349       class->fill_async (bstream, priv->len, io_priority, cancellable,
1350                          skip_fill_buffer_callback, simple);
1351     }
1352 }
1353
1354 static gssize
1355 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1356                                      GAsyncResult   *result,
1357                                      GError        **error)
1358 {
1359   GSimpleAsyncResult *simple;
1360   SkipAsyncData *data;
1361   
1362   simple = G_SIMPLE_ASYNC_RESULT (result);
1363   
1364   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1365
1366   data = g_simple_async_result_get_op_res_gpointer (simple);
1367   
1368   return data->bytes_skipped;
1369 }
1370
1371
1372 #define __G_BUFFERED_INPUT_STREAM_C__
1373 #include "gioaliasdef.c"