Trivial cleanups
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org>
22  */
23
24 #include "config.h"
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gcancellable.h"
28 #include "gasyncresult.h"
29 #include "gsimpleasyncresult.h"
30 #include "gioerror.h"
31 #include <string.h>
32 #include "glibintl.h"
33
34 #include "gioalias.h"
35
36 /**
37  * SECTION:gbufferedinputstream
38  * @short_description: Buffered Input Stream
39  * @include: gio/gio.h
40  * @see_also: #GFilterInputStream, #GInputStream
41  *
42  * Buffered input stream implements #GFilterInputStream and provides
43  * for buffered reads.
44  *
45  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
46  *
47  * To create a buffered input stream, use g_buffered_input_stream_new(),
48  * or g_buffered_input_stream_new_sized() to specify the buffer's size at
49  * construction.
50  *
51  * To get the size of a buffer within a buffered input stream, use
52  * g_buffered_input_stream_get_buffer_size(). To change the size of a
53  * buffered input stream's buffer, use
54  * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
55  * cannot be reduced below the size of the data within the buffer.
56  */
57
58
59 #define DEFAULT_BUFFER_SIZE 4096
60
61 struct _GBufferedInputStreamPrivate {
62   guint8 *buffer;
63   gsize   len;
64   gsize   pos;
65   gsize   end;
66   GAsyncReadyCallback outstanding_callback;
67 };
68
69 enum {
70   PROP_0,
71   PROP_BUFSIZE
72 };
73
74 static void g_buffered_input_stream_set_property  (GObject      *object,
75                                                    guint         prop_id,
76                                                    const GValue *value,
77                                                    GParamSpec   *pspec);
78
79 static void g_buffered_input_stream_get_property  (GObject      *object,
80                                                    guint         prop_id,
81                                                    GValue       *value,
82                                                    GParamSpec   *pspec);
83 static void g_buffered_input_stream_finalize      (GObject *object);
84
85
86 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
87                                                         gsize                  count,
88                                                         GCancellable          *cancellable,
89                                                         GError               **error);
90 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
91                                                         gsize                  count,
92                                                         int                    io_priority,
93                                                         GCancellable          *cancellable,
94                                                         GAsyncReadyCallback    callback,
95                                                         gpointer               user_data);
96 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
97                                                         GAsyncResult          *result,
98                                                         GError               **error);
99 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
100                                                         void                  *buffer,
101                                                         gsize                  count,
102                                                         GCancellable          *cancellable,
103                                                         GError               **error);
104 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
105                                                         void                  *buffer,
106                                                         gsize                  count,
107                                                         int                    io_priority,
108                                                         GCancellable          *cancellable,
109                                                         GAsyncReadyCallback    callback,
110                                                         gpointer               user_data);
111 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
112                                                         GAsyncResult          *result,
113                                                         GError               **error);
114 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
115                                                         gssize                 count,
116                                                         GCancellable          *cancellable,
117                                                         GError               **error);
118 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
119                                                         gssize                 count,
120                                                         int                    io_priority,
121                                                         GCancellable          *cancellable,
122                                                         GAsyncReadyCallback    callback,
123                                                         gpointer               user_data);
124 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
125                                                         GAsyncResult          *result,
126                                                         GError               **error);
127
128 static void compact_buffer (GBufferedInputStream *stream);
129
130 G_DEFINE_TYPE (GBufferedInputStream,
131                g_buffered_input_stream,
132                G_TYPE_FILTER_INPUT_STREAM)
133
134
135 static void
136 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
137 {
138   GObjectClass *object_class;
139   GInputStreamClass *istream_class;
140   GBufferedInputStreamClass *bstream_class;
141
142   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
143
144   object_class = G_OBJECT_CLASS (klass);
145   object_class->get_property = g_buffered_input_stream_get_property;
146   object_class->set_property = g_buffered_input_stream_set_property;
147   object_class->finalize     = g_buffered_input_stream_finalize;
148
149   istream_class = G_INPUT_STREAM_CLASS (klass);
150   istream_class->skip = g_buffered_input_stream_skip;
151   istream_class->skip_async  = g_buffered_input_stream_skip_async;
152   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
153   istream_class->read_fn = g_buffered_input_stream_read;
154   istream_class->read_async  = g_buffered_input_stream_read_async;
155   istream_class->read_finish = g_buffered_input_stream_read_finish;
156
157   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
158   bstream_class->fill = g_buffered_input_stream_real_fill;
159   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
160   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
161
162   g_object_class_install_property (object_class,
163                                    PROP_BUFSIZE,
164                                    g_param_spec_uint ("buffer-size",
165                                                       P_("Buffer Size"),
166                                                       P_("The size of the backend buffer"),
167                                                       1,
168                                                       G_MAXUINT,
169                                                       DEFAULT_BUFFER_SIZE,
170                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
171                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
172
173
174 }
175
176 /**
177  * g_buffered_input_stream_get_buffer_size:
178  * @stream: a #GBufferedInputStream
179  *
180  * Gets the size of the input buffer.
181  *
182  * Returns: the current buffer size.
183  */
184 gsize
185 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
186 {
187   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
188
189   return stream->priv->len;
190 }
191
192 /**
193  * g_buffered_input_stream_set_buffer_size:
194  * @stream: a #GBufferedInputStream
195  * @size: a #gsize
196  *
197  * Sets the size of the internal buffer of @stream to @size, or to the
198  * size of the contents of the buffer. The buffer can never be resized
199  * smaller than its current contents.
200  */
201 void
202 g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream,
203                                          gsize                 size)
204 {
205   GBufferedInputStreamPrivate *priv;
206   gsize in_buffer;
207   guint8 *buffer;
208
209   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
210
211   priv = stream->priv;
212
213   if (priv->len == size)
214     return;
215
216   if (priv->buffer)
217     {
218       in_buffer = priv->end - priv->pos;
219
220       /* Never resize smaller than current buffer contents */
221       size = MAX (size, in_buffer);
222
223       buffer = g_malloc (size);
224       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
225       priv->len = size;
226       priv->pos = 0;
227       priv->end = in_buffer;
228       g_free (priv->buffer);
229       priv->buffer = buffer;
230     }
231   else
232     {
233       priv->len = size;
234       priv->pos = 0;
235       priv->end = 0;
236       priv->buffer = g_malloc (size);
237     }
238
239   g_object_notify (G_OBJECT (stream), "buffer-size");
240 }
241
242 static void
243 g_buffered_input_stream_set_property (GObject      *object,
244                                       guint         prop_id,
245                                       const GValue *value,
246                                       GParamSpec   *pspec)
247 {
248   GBufferedInputStream        *bstream;
249
250   bstream = G_BUFFERED_INPUT_STREAM (object);
251
252   switch (prop_id)
253     {
254     case PROP_BUFSIZE:
255       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
256       break;
257
258     default:
259       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
260       break;
261     }
262 }
263
264 static void
265 g_buffered_input_stream_get_property (GObject    *object,
266                                       guint       prop_id,
267                                       GValue     *value,
268                                       GParamSpec *pspec)
269 {
270   GBufferedInputStreamPrivate *priv;
271   GBufferedInputStream        *bstream;
272
273   bstream = G_BUFFERED_INPUT_STREAM (object);
274   priv = bstream->priv;
275
276   switch (prop_id)
277     {
278     case PROP_BUFSIZE:
279       g_value_set_uint (value, priv->len);
280       break;
281
282     default:
283       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
284       break;
285     }
286 }
287
288 static void
289 g_buffered_input_stream_finalize (GObject *object)
290 {
291   GBufferedInputStreamPrivate *priv;
292   GBufferedInputStream        *stream;
293
294   stream = G_BUFFERED_INPUT_STREAM (object);
295   priv = stream->priv;
296
297   g_free (priv->buffer);
298
299   G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
300 }
301
302 static void
303 g_buffered_input_stream_init (GBufferedInputStream *stream)
304 {
305   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
306                                               G_TYPE_BUFFERED_INPUT_STREAM,
307                                               GBufferedInputStreamPrivate);
308 }
309
310
311 /**
312  * g_buffered_input_stream_new:
313  * @base_stream: a #GInputStream
314  *
315  * Creates a new #GInputStream from the given @base_stream, with
316  * a buffer set to the default size (4 kilobytes).
317  *
318  * Returns: a #GInputStream for the given @base_stream.
319  */
320 GInputStream *
321 g_buffered_input_stream_new (GInputStream *base_stream)
322 {
323   GInputStream *stream;
324
325   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
326
327   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
328                          "base-stream", base_stream,
329                          NULL);
330
331   return stream;
332 }
333
334 /**
335  * g_buffered_input_stream_new_sized:
336  * @base_stream: a #GInputStream
337  * @size: a #gsize
338  *
339  * Creates a new #GBufferedInputStream from the given @base_stream,
340  * with a buffer set to @size.
341  *
342  * Returns: a #GInputStream.
343  */
344 GInputStream *
345 g_buffered_input_stream_new_sized (GInputStream *base_stream,
346                                    gsize         size)
347 {
348   GInputStream *stream;
349
350   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
351
352   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
353                          "base-stream", base_stream,
354                          "buffer-size", (guint)size,
355                          NULL);
356
357   return stream;
358 }
359
360 /**
361  * g_buffered_input_stream_fill:
362  * @stream: a #GBufferedInputStream
363  * @count: the number of bytes that will be read from the stream
364  * @cancellable: optional #GCancellable object, %NULL to ignore
365  * @error: location to store the error occuring, or %NULL to ignore
366  *
367  * Tries to read @count bytes from the stream into the buffer.
368  * Will block during this read.
369  *
370  * If @count is zero, returns zero and does nothing. A value of @count
371  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
372  *
373  * On success, the number of bytes read into the buffer is returned.
374  * It is not an error if this is not the same as the requested size, as it
375  * can happen e.g. near the end of a file. Zero is returned on end of file
376  * (or if @count is zero),  but never otherwise.
377  *
378  * If @count is -1 then the attempted read size is equal to the number of
379  * bytes that are required to fill the buffer.
380  *
381  * If @cancellable is not %NULL, then the operation can be cancelled by
382  * triggering the cancellable object from another thread. If the operation
383  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
384  * operation was partially finished when the operation was cancelled the
385  * partial result will be returned, without an error.
386  *
387  * On error -1 is returned and @error is set accordingly.
388  *
389  * For the asynchronous, non-blocking, version of this function, see
390  * g_buffered_input_stream_fill_async().
391  *
392  * Returns: the number of bytes read into @stream's buffer, up to @count,
393  *     or -1 on error.
394  */
395 gssize
396 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
397                               gssize                 count,
398                               GCancellable          *cancellable,
399                               GError               **error)
400 {
401   GBufferedInputStreamClass *class;
402   GInputStream *input_stream;
403   gssize res;
404
405   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
406
407   input_stream = G_INPUT_STREAM (stream);
408
409   if (count < -1)
410     {
411       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
412                    _("Too large count value passed to %s"), G_STRFUNC);
413       return -1;
414     }
415
416   if (!g_input_stream_set_pending (input_stream, error))
417     return -1;
418
419   if (cancellable)
420     g_cancellable_push_current (cancellable);
421
422   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
423   res = class->fill (stream, count, cancellable, error);
424
425   if (cancellable)
426     g_cancellable_pop_current (cancellable);
427
428   g_input_stream_clear_pending (input_stream);
429
430   return res;
431 }
432
433 static void
434 async_fill_callback_wrapper (GObject      *source_object,
435                              GAsyncResult *res,
436                              gpointer      user_data)
437 {
438   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
439
440   g_input_stream_clear_pending (G_INPUT_STREAM (stream));
441   (*stream->priv->outstanding_callback) (source_object, res, user_data);
442   g_object_unref (stream);
443 }
444
445 /**
446  * g_buffered_input_stream_fill_async:
447  * @stream: a #GBufferedInputStream
448  * @count: the number of bytes that will be read from the stream
449  * @io_priority: the <link linkend="io-priority">I/O priority</link>
450  *     of the request
451  * @cancellable: optional #GCancellable object
452  * @callback: a #GAsyncReadyCallback
453  * @user_data: a #gpointer
454  *
455  * Reads data into @stream's buffer asynchronously, up to @count size.
456  * @io_priority can be used to prioritize reads. For the synchronous
457  * version of this function, see g_buffered_input_stream_fill().
458  *
459  * If @count is -1 then the attempted read size is equal to the number
460  * of bytes that are required to fill the buffer.
461  */
462 void
463 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
464                                     gssize                count,
465                                     int                   io_priority,
466                                     GCancellable         *cancellable,
467                                     GAsyncReadyCallback   callback,
468                                     gpointer              user_data)
469 {
470   GBufferedInputStreamClass *class;
471   GSimpleAsyncResult *simple;
472   GError *error = NULL;
473
474   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
475
476   if (count == 0)
477     {
478       simple = g_simple_async_result_new (G_OBJECT (stream),
479                                           callback,
480                                           user_data,
481                                           g_buffered_input_stream_fill_async);
482       g_simple_async_result_complete_in_idle (simple);
483       g_object_unref (simple);
484       return;
485     }
486
487   if (count < -1)
488     {
489       g_simple_async_report_error_in_idle (G_OBJECT (stream),
490                                            callback,
491                                            user_data,
492                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
493                                            _("Too large count value passed to %s"),
494                                            G_STRFUNC);
495       return;
496     }
497
498   if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
499     {
500       g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
501                                             callback,
502                                             user_data,
503                                             error);
504       g_error_free (error);
505       return;
506     }
507
508   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
509
510   stream->priv->outstanding_callback = callback;
511   g_object_ref (stream);
512   class->fill_async (stream, count, io_priority, cancellable,
513                      async_fill_callback_wrapper, user_data);
514 }
515
516 /**
517  * g_buffered_input_stream_fill_finish:
518  * @stream: a #GBufferedInputStream
519  * @result: a #GAsyncResult
520  * @error: a #GError
521  *
522  * Finishes an asynchronous read.
523  *
524  * Returns: a #gssize of the read stream, or %-1 on an error.
525  */
526 gssize
527 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
528                                      GAsyncResult          *result,
529                                      GError               **error)
530 {
531   GSimpleAsyncResult *simple;
532   GBufferedInputStreamClass *class;
533
534   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
535   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
536
537   if (G_IS_SIMPLE_ASYNC_RESULT (result))
538     {
539       simple = G_SIMPLE_ASYNC_RESULT (result);
540       if (g_simple_async_result_propagate_error (simple, error))
541         return -1;
542
543       /* Special case read of 0 bytes */
544       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
545         return 0;
546     }
547
548   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
549   return class->fill_finish (stream, result, error);
550 }
551
552 /**
553  * g_buffered_input_stream_get_available:
554  * @stream: #GBufferedInputStream
555  *
556  * Gets the size of the available data within the stream.
557  *
558  * Returns: size of the available stream.
559  */
560 gsize
561 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
562 {
563   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
564
565   return stream->priv->end - stream->priv->pos;
566 }
567
568 /**
569  * g_buffered_input_stream_peek:
570  * @stream: a #GBufferedInputStream
571  * @buffer: a pointer to an allocated chunk of memory
572  * @offset: a #gsize
573  * @count: a #gsize
574  *
575  * Peeks in the buffer, copying data of size @count into @buffer,
576  * offset @offset bytes.
577  *
578  * Returns: a #gsize of the number of bytes peeked, or -1 on error.
579  */
580 gsize
581 g_buffered_input_stream_peek (GBufferedInputStream *stream,
582                               void                 *buffer,
583                               gsize                 offset,
584                               gsize                 count)
585 {
586   gsize available;
587   gsize end;
588
589   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
590   g_return_val_if_fail (buffer != NULL, -1);
591
592   available = g_buffered_input_stream_get_available (stream);
593
594   if (offset > available)
595     return 0;
596
597   end = MIN (offset + count, available);
598   count = end - offset;
599
600   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
601   return count;
602 }
603
604 /**
605  * g_buffered_input_stream_peek_buffer:
606  * @stream: a #GBufferedInputStream
607  * @count: a #gsize to get the number of bytes available in the buffer
608  *
609  * Returns the buffer with the currently available bytes. The returned
610  * buffer must not be modified and will become invalid when reading from
611  * the stream or filling the buffer.
612  *
613  * Returns: read-only buffer
614  */
615 const void*
616 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
617                                      gsize                *count)
618 {
619   GBufferedInputStreamPrivate *priv;
620
621   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
622
623   priv = stream->priv;
624
625   if (count)
626     *count = priv->end - priv->pos;
627
628   return priv->buffer + priv->pos;
629 }
630
631 static void
632 compact_buffer (GBufferedInputStream *stream)
633 {
634   GBufferedInputStreamPrivate *priv;
635   gsize current_size;
636
637   priv = stream->priv;
638
639   current_size = priv->end - priv->pos;
640
641   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
642
643   priv->pos = 0;
644   priv->end = current_size;
645 }
646
647 static gssize
648 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
649                                    gssize                 count,
650                                    GCancellable          *cancellable,
651                                    GError               **error)
652 {
653   GBufferedInputStreamPrivate *priv;
654   GInputStream *base_stream;
655   gssize nread;
656   gsize in_buffer;
657
658   priv = stream->priv;
659
660   if (count == -1)
661     count = priv->len;
662
663   in_buffer = priv->end - priv->pos;
664
665   /* Never fill more than can fit in the buffer */
666   count = MIN (count, priv->len - in_buffer);
667
668   /* If requested length does not fit at end, compact */
669   if (priv->len - priv->end < count)
670     compact_buffer (stream);
671
672   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
673   nread = g_input_stream_read (base_stream,
674                                priv->buffer + priv->end,
675                                count,
676                                cancellable,
677                                error);
678
679   if (nread > 0)
680     priv->end += nread;
681
682   return nread;
683 }
684
685 static gssize
686 g_buffered_input_stream_skip (GInputStream  *stream,
687                               gsize          count,
688                               GCancellable  *cancellable,
689                               GError       **error)
690 {
691   GBufferedInputStream        *bstream;
692   GBufferedInputStreamPrivate *priv;
693   GBufferedInputStreamClass *class;
694   GInputStream *base_stream;
695   gsize available, bytes_skipped;
696   gssize nread;
697
698   bstream = G_BUFFERED_INPUT_STREAM (stream);
699   priv = bstream->priv;
700
701   available = priv->end - priv->pos;
702
703   if (count <= available)
704     {
705       priv->pos += count;
706       return count;
707     }
708
709   /* Full request not available, skip all currently available and
710    * request refill for more
711    */
712
713   priv->pos = 0;
714   priv->end = 0;
715   bytes_skipped = available;
716   count -= available;
717
718   if (bytes_skipped > 0)
719     error = NULL; /* Ignore further errors if we already read some data */
720
721   if (count > priv->len)
722     {
723       /* Large request, shortcut buffer */
724
725       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
726
727       nread = g_input_stream_skip (base_stream,
728                                    count,
729                                    cancellable,
730                                    error);
731
732       if (nread < 0 && bytes_skipped == 0)
733         return -1;
734
735       if (nread > 0)
736         bytes_skipped += nread;
737
738       return bytes_skipped;
739     }
740
741   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
742   nread = class->fill (bstream, priv->len, cancellable, error);
743
744   if (nread < 0)
745     {
746       if (bytes_skipped == 0)
747         return -1;
748       else
749         return bytes_skipped;
750     }
751
752   available = priv->end - priv->pos;
753   count = MIN (count, available);
754
755   bytes_skipped += count;
756   priv->pos += count;
757
758   return bytes_skipped;
759 }
760
761 static gssize
762 g_buffered_input_stream_read (GInputStream *stream,
763                               void         *buffer,
764                               gsize         count,
765                               GCancellable *cancellable,
766                               GError      **error)
767 {
768   GBufferedInputStream        *bstream;
769   GBufferedInputStreamPrivate *priv;
770   GBufferedInputStreamClass *class;
771   GInputStream *base_stream;
772   gsize available, bytes_read;
773   gssize nread;
774
775   bstream = G_BUFFERED_INPUT_STREAM (stream);
776   priv = bstream->priv;
777
778   available = priv->end - priv->pos;
779
780   if (count <= available)
781     {
782       memcpy (buffer, priv->buffer + priv->pos, count);
783       priv->pos += count;
784       return count;
785     }
786
787   /* Full request not available, read all currently available and
788    * request refill for more
789    */
790
791   memcpy (buffer, priv->buffer + priv->pos, available);
792   priv->pos = 0;
793   priv->end = 0;
794   bytes_read = available;
795   count -= available;
796
797   if (bytes_read > 0)
798     error = NULL; /* Ignore further errors if we already read some data */
799
800   if (count > priv->len)
801     {
802       /* Large request, shortcut buffer */
803
804       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
805
806       nread = g_input_stream_read (base_stream,
807                                    (char *)buffer + bytes_read,
808                                    count,
809                                    cancellable,
810                                    error);
811
812       if (nread < 0 && bytes_read == 0)
813         return -1;
814
815       if (nread > 0)
816         bytes_read += nread;
817
818       return bytes_read;
819     }
820
821   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
822   nread = class->fill (bstream, priv->len, cancellable, error);
823   if (nread < 0)
824     {
825       if (bytes_read == 0)
826         return -1;
827       else
828         return bytes_read;
829     }
830
831   available = priv->end - priv->pos;
832   count = MIN (count, available);
833
834   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
835   bytes_read += count;
836   priv->pos += count;
837
838   return bytes_read;
839 }
840
841 /**
842  * g_buffered_input_stream_read_byte:
843  * @stream: a #GBufferedInputStream
844  * @cancellable: optional #GCancellable object, %NULL to ignore
845  * @error: location to store the error occuring, or %NULL to ignore
846  *
847  * Tries to read a single byte from the stream or the buffer. Will block
848  * during this read.
849  *
850  * On success, the byte read from the stream is returned. On end of stream
851  * -1 is returned but it's not an exceptional error and @error is not set.
852  *
853  * If @cancellable is not %NULL, then the operation can be cancelled by
854  * triggering the cancellable object from another thread. If the operation
855  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
856  * operation was partially finished when the operation was cancelled the
857  * partial result will be returned, without an error.
858  *
859  * On error -1 is returned and @error is set accordingly.
860  *
861  * Returns: the byte read from the @stream, or -1 on end of stream or error.
862  */
863 int
864 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
865                                    GCancellable          *cancellable,
866                                    GError               **error)
867 {
868   GBufferedInputStreamPrivate *priv;
869   GBufferedInputStreamClass *class;
870   GInputStream *input_stream;
871   gsize available;
872   gssize nread;
873
874   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
875
876   priv = stream->priv;
877   input_stream = G_INPUT_STREAM (stream);
878
879   if (g_input_stream_is_closed (input_stream))
880     {
881       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
882                            _("Stream is already closed"));
883       return -1;
884     }
885
886   if (!g_input_stream_set_pending (input_stream, error))
887     return -1;
888
889   available = priv->end - priv->pos;
890
891   if (available != 0)
892     {
893       g_input_stream_clear_pending (input_stream);
894       return priv->buffer[priv->pos++];
895     }
896
897   /* Byte not available, request refill for more */
898
899   if (cancellable)
900     g_cancellable_push_current (cancellable);
901
902   priv->pos = 0;
903   priv->end = 0;
904
905   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
906   nread = class->fill (stream, priv->len, cancellable, error);
907
908   if (cancellable)
909     g_cancellable_pop_current (cancellable);
910
911   g_input_stream_clear_pending (input_stream);
912
913   if (nread <= 0)
914     return -1; /* error or end of stream */
915
916   return priv->buffer[priv->pos++];
917 }
918
919 /* ************************** */
920 /* Async stuff implementation */
921 /* ************************** */
922
923 static void
924 fill_async_callback (GObject      *source_object,
925                      GAsyncResult *result,
926                      gpointer      user_data)
927 {
928   GError *error;
929   gssize res;
930   GSimpleAsyncResult *simple;
931
932   simple = user_data;
933
934   error = NULL;
935   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
936                                     result, &error);
937
938   g_simple_async_result_set_op_res_gssize (simple, res);
939   if (res == -1)
940     {
941       g_simple_async_result_set_from_error (simple, error);
942       g_error_free (error);
943     }
944   else
945     {
946       GBufferedInputStreamPrivate *priv;
947       GObject *object;
948
949       object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
950       priv = G_BUFFERED_INPUT_STREAM (object)->priv;
951
952       g_assert_cmpint (priv->end + res, <=, priv->len);
953       priv->end += res;
954
955       g_object_unref (object);
956     }
957
958   /* Complete immediately, not in idle, since we're already
959    * in a mainloop callout
960    */
961   g_simple_async_result_complete (simple);
962   g_object_unref (simple);
963 }
964
965 static void
966 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
967                                          gssize                count,
968                                          int                   io_priority,
969                                          GCancellable         *cancellable,
970                                          GAsyncReadyCallback   callback,
971                                          gpointer              user_data)
972 {
973   GBufferedInputStreamPrivate *priv;
974   GInputStream *base_stream;
975   GSimpleAsyncResult *simple;
976   gsize in_buffer;
977
978   priv = stream->priv;
979
980   if (count == -1)
981     count = priv->len;
982
983   in_buffer = priv->end - priv->pos;
984
985   /* Never fill more than can fit in the buffer */
986   count = MIN (count, priv->len - in_buffer);
987
988   /* If requested length does not fit at end, compact */
989   if (priv->len - priv->end < count)
990     compact_buffer (stream);
991
992   simple = g_simple_async_result_new (G_OBJECT (stream),
993                                       callback, user_data,
994                                       g_buffered_input_stream_real_fill_async);
995
996   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
997   g_input_stream_read_async (base_stream,
998                              priv->buffer + priv->end,
999                              count,
1000                              io_priority,
1001                              cancellable,
1002                              fill_async_callback,
1003                              simple);
1004 }
1005
1006 static gssize
1007 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
1008                                           GAsyncResult         *result,
1009                                           GError              **error)
1010 {
1011   GSimpleAsyncResult *simple;
1012   gssize nread;
1013
1014   simple = G_SIMPLE_ASYNC_RESULT (result);
1015   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1016
1017   nread = g_simple_async_result_get_op_res_gssize (simple);
1018   return nread;
1019 }
1020
1021 typedef struct
1022 {
1023   gssize bytes_read;
1024   gssize count;
1025   void *buffer;
1026 } ReadAsyncData;
1027
1028 static void
1029 free_read_async_data (gpointer _data)
1030 {
1031   ReadAsyncData *data = _data;
1032   g_slice_free (ReadAsyncData, data);
1033 }
1034
1035 static void
1036 large_read_callback (GObject *source_object,
1037                      GAsyncResult *result,
1038                      gpointer user_data)
1039 {
1040   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1041   ReadAsyncData *data;
1042   GError *error;
1043   gssize nread;
1044
1045   data = g_simple_async_result_get_op_res_gpointer (simple);
1046
1047   error = NULL;
1048   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1049                                       result, &error);
1050
1051   /* Only report the error if we've not already read some data */
1052   if (nread < 0 && data->bytes_read == 0)
1053     g_simple_async_result_set_from_error (simple, error);
1054
1055   if (nread > 0)
1056     data->bytes_read += nread;
1057
1058   if (error)
1059     g_error_free (error);
1060
1061   /* Complete immediately, not in idle, since we're already
1062    * in a mainloop callout
1063    */
1064   g_simple_async_result_complete (simple);
1065   g_object_unref (simple);
1066 }
1067
1068 static void
1069 read_fill_buffer_callback (GObject      *source_object,
1070                            GAsyncResult *result,
1071                            gpointer      user_data)
1072 {
1073   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1074   GBufferedInputStream *bstream;
1075   GBufferedInputStreamPrivate *priv;
1076   ReadAsyncData *data;
1077   GError *error;
1078   gssize nread;
1079   gsize available;
1080
1081   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1082   priv = bstream->priv;
1083
1084   data = g_simple_async_result_get_op_res_gpointer (simple);
1085
1086   error = NULL;
1087   nread = g_buffered_input_stream_fill_finish (bstream,
1088                                                result, &error);
1089
1090   if (nread < 0 && data->bytes_read == 0)
1091     g_simple_async_result_set_from_error (simple, error);
1092
1093
1094   if (nread > 0)
1095     {
1096       available = priv->end - priv->pos;
1097       data->count = MIN (data->count, available);
1098
1099       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1100       data->bytes_read += data->count;
1101       priv->pos += data->count;
1102     }
1103
1104   if (error)
1105     g_error_free (error);
1106
1107   /* Complete immediately, not in idle, since we're already
1108    * in a mainloop callout
1109    */
1110   g_simple_async_result_complete (simple);
1111   g_object_unref (simple);
1112 }
1113
1114 static void
1115 g_buffered_input_stream_read_async (GInputStream        *stream,
1116                                     void                *buffer,
1117                                     gsize                count,
1118                                     int                  io_priority,
1119                                     GCancellable        *cancellable,
1120                                     GAsyncReadyCallback  callback,
1121                                     gpointer             user_data)
1122 {
1123   GBufferedInputStream *bstream;
1124   GBufferedInputStreamPrivate *priv;
1125   GBufferedInputStreamClass *class;
1126   GInputStream *base_stream;
1127   gsize available;
1128   GSimpleAsyncResult *simple;
1129   ReadAsyncData *data;
1130
1131   bstream = G_BUFFERED_INPUT_STREAM (stream);
1132   priv = bstream->priv;
1133
1134   data = g_slice_new (ReadAsyncData);
1135   data->buffer = buffer;
1136   data->bytes_read = 0;
1137   simple = g_simple_async_result_new (G_OBJECT (stream),
1138                                       callback, user_data,
1139                                       g_buffered_input_stream_read_async);
1140   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1141
1142   available = priv->end - priv->pos;
1143
1144   if (count <= available)
1145     {
1146       memcpy (buffer, priv->buffer + priv->pos, count);
1147       priv->pos += count;
1148       data->bytes_read = count;
1149
1150       g_simple_async_result_complete_in_idle (simple);
1151       g_object_unref (simple);
1152       return;
1153     }
1154
1155
1156   /* Full request not available, read all currently available
1157    * and request refill for more
1158    */
1159
1160   memcpy (buffer, priv->buffer + priv->pos, available);
1161   priv->pos = 0;
1162   priv->end = 0;
1163
1164   count -= available;
1165
1166   data->bytes_read = available;
1167   data->count = count;
1168
1169   if (count > priv->len)
1170     {
1171       /* Large request, shortcut buffer */
1172
1173       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1174
1175       g_input_stream_read_async (base_stream,
1176                                  (char *)buffer + data->bytes_read,
1177                                  count,
1178                                  io_priority, cancellable,
1179                                  large_read_callback,
1180                                  simple);
1181     }
1182   else
1183     {
1184       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1185       class->fill_async (bstream, priv->len, io_priority, cancellable,
1186                          read_fill_buffer_callback, simple);
1187     }
1188 }
1189
1190 static gssize
1191 g_buffered_input_stream_read_finish (GInputStream   *stream,
1192                                      GAsyncResult   *result,
1193                                      GError        **error)
1194 {
1195   GSimpleAsyncResult *simple;
1196   ReadAsyncData *data;
1197
1198   simple = G_SIMPLE_ASYNC_RESULT (result);
1199
1200   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1201
1202   data = g_simple_async_result_get_op_res_gpointer (simple);
1203
1204   return data->bytes_read;
1205 }
1206
1207 typedef struct
1208 {
1209   gssize bytes_skipped;
1210   gssize count;
1211 } SkipAsyncData;
1212
1213 static void
1214 free_skip_async_data (gpointer _data)
1215 {
1216   SkipAsyncData *data = _data;
1217   g_slice_free (SkipAsyncData, data);
1218 }
1219
1220 static void
1221 large_skip_callback (GObject      *source_object,
1222                      GAsyncResult *result,
1223                      gpointer      user_data)
1224 {
1225   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1226   SkipAsyncData *data;
1227   GError *error;
1228   gssize nread;
1229
1230   data = g_simple_async_result_get_op_res_gpointer (simple);
1231
1232   error = NULL;
1233   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1234                                       result, &error);
1235
1236   /* Only report the error if we've not already read some data */
1237   if (nread < 0 && data->bytes_skipped == 0)
1238     g_simple_async_result_set_from_error (simple, error);
1239
1240   if (nread > 0)
1241     data->bytes_skipped += nread;
1242
1243   if (error)
1244     g_error_free (error);
1245
1246   /* Complete immediately, not in idle, since we're already
1247    * in a mainloop callout
1248    */
1249   g_simple_async_result_complete (simple);
1250   g_object_unref (simple);
1251 }
1252
1253 static void
1254 skip_fill_buffer_callback (GObject      *source_object,
1255                            GAsyncResult *result,
1256                            gpointer      user_data)
1257 {
1258   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1259   GBufferedInputStream *bstream;
1260   GBufferedInputStreamPrivate *priv;
1261   SkipAsyncData *data;
1262   GError *error;
1263   gssize nread;
1264   gsize available;
1265
1266   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1267   priv = bstream->priv;
1268
1269   data = g_simple_async_result_get_op_res_gpointer (simple);
1270
1271   error = NULL;
1272   nread = g_buffered_input_stream_fill_finish (bstream,
1273                                                result, &error);
1274
1275   if (nread < 0 && data->bytes_skipped == 0)
1276     g_simple_async_result_set_from_error (simple, error);
1277
1278   if (nread > 0)
1279     {
1280       available = priv->end - priv->pos;
1281       data->count = MIN (data->count, available);
1282
1283       data->bytes_skipped += data->count;
1284       priv->pos += data->count;
1285     }
1286
1287   if (error)
1288     g_error_free (error);
1289
1290   /* Complete immediately, not in idle, since we're already
1291    * in a mainloop callout
1292    */
1293   g_simple_async_result_complete (simple);
1294   g_object_unref (simple);
1295 }
1296
1297 static void
1298 g_buffered_input_stream_skip_async (GInputStream        *stream,
1299                                     gsize                count,
1300                                     int                  io_priority,
1301                                     GCancellable        *cancellable,
1302                                     GAsyncReadyCallback  callback,
1303                                     gpointer             user_data)
1304 {
1305   GBufferedInputStream *bstream;
1306   GBufferedInputStreamPrivate *priv;
1307   GBufferedInputStreamClass *class;
1308   GInputStream *base_stream;
1309   gsize available;
1310   GSimpleAsyncResult *simple;
1311   SkipAsyncData *data;
1312
1313   bstream = G_BUFFERED_INPUT_STREAM (stream);
1314   priv = bstream->priv;
1315
1316   data = g_slice_new (SkipAsyncData);
1317   data->bytes_skipped = 0;
1318   simple = g_simple_async_result_new (G_OBJECT (stream),
1319                                       callback, user_data,
1320                                       g_buffered_input_stream_skip_async);
1321   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1322
1323   available = priv->end - priv->pos;
1324
1325   if (count <= available)
1326     {
1327       priv->pos += count;
1328       data->bytes_skipped = count;
1329
1330       g_simple_async_result_complete_in_idle (simple);
1331       g_object_unref (simple);
1332       return;
1333     }
1334
1335   /* Full request not available, skip all currently available
1336    * and request refill for more
1337    */
1338
1339   priv->pos = 0;
1340   priv->end = 0;
1341
1342   count -= available;
1343
1344   data->bytes_skipped = available;
1345   data->count = count;
1346
1347   if (count > priv->len)
1348     {
1349       /* Large request, shortcut buffer */
1350
1351       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1352
1353       g_input_stream_skip_async (base_stream,
1354                                  count,
1355                                  io_priority, cancellable,
1356                                  large_skip_callback,
1357                                  simple);
1358     }
1359   else
1360     {
1361       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1362       class->fill_async (bstream, priv->len, io_priority, cancellable,
1363                          skip_fill_buffer_callback, simple);
1364     }
1365 }
1366
1367 static gssize
1368 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1369                                      GAsyncResult   *result,
1370                                      GError        **error)
1371 {
1372   GSimpleAsyncResult *simple;
1373   SkipAsyncData *data;
1374
1375   simple = G_SIMPLE_ASYNC_RESULT (result);
1376
1377   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1378
1379   data = g_simple_async_result_get_op_res_gpointer (simple);
1380
1381   return data->bytes_skipped;
1382 }
1383
1384
1385 #define __G_BUFFERED_INPUT_STREAM_C__
1386 #include "gioaliasdef.c"