Don't cheat and unset the "pending" flag around inner calls. Instead, call
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org> 
22  */
23
24 #include <config.h>
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gsimpleasyncresult.h"
28 #include <string.h>
29 #include "glibintl.h"
30
31 #include "gioalias.h"
32
33 /**
34  * SECTION:gbufferedinputstream
35  * @short_description: Buffered Input Stream
36  * @see_also: #GFilterInputStream, #GInputStream
37  * 
38  * Buffered input stream implements #GFilterInputStream and provides 
39  * for buffered reads. 
40  * 
41  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
42  * 
43  * To create a buffered input stream, use g_buffered_input_stream_new(), 
44  * or g_buffered_input_stream_new_sized() to specify the buffer's size at 
45  * construction.
46  * 
47  * To get the size of a buffer within a buffered input stream, use 
48  * g_buffered_input_stream_get_buffer_size(). To change the size of a 
49  * buffered input stream's buffer, use
50  * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size 
51  * cannot be reduced below the size of the data within the buffer.
52  *
53  **/
54
55
56
57 #define DEFAULT_BUFFER_SIZE 4096
58
59 struct _GBufferedInputStreamPrivate {
60   guint8 *buffer;
61   gsize   len;
62   gsize   pos;
63   gsize   end;
64   GAsyncReadyCallback outstanding_callback;
65 };
66
67 enum {
68   PROP_0,
69   PROP_BUFSIZE
70 };
71
72 static void g_buffered_input_stream_set_property  (GObject      *object,
73                                                    guint         prop_id,
74                                                    const GValue *value,
75                                                    GParamSpec   *pspec);
76
77 static void g_buffered_input_stream_get_property  (GObject      *object,
78                                                    guint         prop_id,
79                                                    GValue       *value,
80                                                    GParamSpec   *pspec);
81 static void g_buffered_input_stream_finalize      (GObject *object);
82
83
84 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
85                                                         gsize                  count,
86                                                         GCancellable          *cancellable,
87                                                         GError               **error);
88 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
89                                                         gsize                  count,
90                                                         int                    io_priority,
91                                                         GCancellable          *cancellable,
92                                                         GAsyncReadyCallback    callback,
93                                                         gpointer               user_data);
94 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
95                                                         GAsyncResult          *result,
96                                                         GError               **error);
97 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
98                                                         void                  *buffer,
99                                                         gsize                  count,
100                                                         GCancellable          *cancellable,
101                                                         GError               **error);
102 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
103                                                         void                  *buffer,
104                                                         gsize                  count,
105                                                         int                    io_priority,
106                                                         GCancellable          *cancellable,
107                                                         GAsyncReadyCallback    callback,
108                                                         gpointer               user_data);
109 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
110                                                         GAsyncResult          *result,
111                                                         GError               **error);
112 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
113                                                         gssize                 count,
114                                                         GCancellable          *cancellable,
115                                                         GError               **error);
116 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
117                                                         gssize                 count,
118                                                         int                    io_priority,
119                                                         GCancellable          *cancellable,
120                                                         GAsyncReadyCallback    callback,
121                                                         gpointer               user_data);
122 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
123                                                         GAsyncResult          *result,
124                                                         GError               **error);
125
126 static void compact_buffer (GBufferedInputStream *stream);
127
128 G_DEFINE_TYPE (GBufferedInputStream,
129                g_buffered_input_stream,
130                G_TYPE_FILTER_INPUT_STREAM)
131
132
133 static void
134 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
135 {
136   GObjectClass *object_class;
137   GInputStreamClass *istream_class;
138   GBufferedInputStreamClass *bstream_class;
139
140   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
141
142   object_class = G_OBJECT_CLASS (klass);
143   object_class->get_property = g_buffered_input_stream_get_property;
144   object_class->set_property = g_buffered_input_stream_set_property;
145   object_class->finalize     = g_buffered_input_stream_finalize;
146
147   istream_class = G_INPUT_STREAM_CLASS (klass);
148   istream_class->skip = g_buffered_input_stream_skip;
149   istream_class->skip_async  = g_buffered_input_stream_skip_async;
150   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
151   istream_class->read = g_buffered_input_stream_read;
152   istream_class->read_async  = g_buffered_input_stream_read_async;
153   istream_class->read_finish = g_buffered_input_stream_read_finish;
154
155   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
156   bstream_class->fill = g_buffered_input_stream_real_fill;
157   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
158   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
159   
160   g_object_class_install_property (object_class,
161                                    PROP_BUFSIZE,
162                                    g_param_spec_uint ("buffer-size",
163                                                       P_("Buffer Size"),
164                                                       P_("The size of the backend buffer"),
165                                                       1,
166                                                       G_MAXUINT,
167                                                       DEFAULT_BUFFER_SIZE,
168                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
169                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
170
171
172 }
173
174 /**
175  * g_buffered_input_stream_get_buffer_size:
176  * @stream: #GBufferedInputStream.
177  * 
178  * Gets the size of the input buffer.
179  * 
180  * Returns: the current buffer size, or %-1 on error.
181  **/
182 gsize
183 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
184 {
185   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
186
187   return stream->priv->len;
188 }
189
190 /**
191  * g_buffered_input_stream_set_buffer_size:
192  * @stream: #GBufferedInputStream.
193  * @size: a #gsize.
194  *
195  * Sets the size of the internal buffer of @stream to @size, or to the 
196  * size of the contents of the buffer. The buffer can never be resized 
197  * smaller than its current contents.
198  **/
199 void
200 g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
201                                          gsize                  size)
202 {
203   GBufferedInputStreamPrivate *priv;
204   gsize in_buffer;
205   guint8 *buffer;
206   
207   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
208
209   priv = stream->priv;
210
211   if (priv->len == size)
212     return;
213
214   if (priv->buffer)
215     {
216       in_buffer = priv->end - priv->pos;
217
218       /* Never resize smaller than current buffer contents */
219       size = MAX (size, in_buffer);
220
221       buffer = g_malloc (size);
222       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
223       priv->len = size;
224       priv->pos = 0;
225       priv->end = in_buffer;
226       g_free (priv->buffer);
227       priv->buffer = buffer;
228     }
229   else
230     {
231       priv->len = size;
232       priv->pos = 0;
233       priv->end = 0;
234       priv->buffer = g_malloc (size);
235     }
236
237   g_object_notify (G_OBJECT (stream), "buffer-size");
238 }
239
240 static void
241 g_buffered_input_stream_set_property (GObject      *object,
242                                       guint         prop_id,
243                                       const GValue *value,
244                                       GParamSpec   *pspec)
245 {
246   GBufferedInputStreamPrivate *priv;
247   GBufferedInputStream        *bstream;
248
249   bstream = G_BUFFERED_INPUT_STREAM (object);
250   priv = bstream->priv;
251
252   switch (prop_id) 
253     {
254     case PROP_BUFSIZE:
255       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
256       break;
257
258     default:
259       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
260       break;
261     }
262
263 }
264
265 static void
266 g_buffered_input_stream_get_property (GObject    *object,
267                                       guint       prop_id,
268                                       GValue     *value,
269                                       GParamSpec *pspec)
270 {
271   GBufferedInputStreamPrivate *priv;
272   GBufferedInputStream        *bstream;
273
274   bstream = G_BUFFERED_INPUT_STREAM (object);
275   priv = bstream->priv;
276
277   switch (prop_id)
278     { 
279     case PROP_BUFSIZE:
280       g_value_set_uint (value, priv->len);
281       break;
282
283     default:
284       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
285       break;
286     }
287 }
288
289 static void
290 g_buffered_input_stream_finalize (GObject *object)
291 {
292   GBufferedInputStreamPrivate *priv;
293   GBufferedInputStream        *stream;
294
295   stream = G_BUFFERED_INPUT_STREAM (object);
296   priv = stream->priv;
297
298   g_free (priv->buffer);
299
300   if (G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize)
301     (*G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) (object);
302 }
303
304 static void
305 g_buffered_input_stream_init (GBufferedInputStream *stream)
306 {
307   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
308                                               G_TYPE_BUFFERED_INPUT_STREAM,
309                                               GBufferedInputStreamPrivate);
310 }
311
312
313 /**
314  * g_buffered_input_stream_new:
315  * @base_stream: a #GInputStream.
316  * 
317  * Creates a new #GInputStream from the given @base_stream, with 
318  * a buffer set to the default size (4 kilobytes).
319  *
320  * Returns: a #GInputStream for the given @base_stream.
321  **/
322 GInputStream *
323 g_buffered_input_stream_new (GInputStream *base_stream)
324 {
325   GInputStream *stream;
326
327   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
328
329   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
330                          "base-stream", base_stream,
331                          NULL);
332
333   return stream;
334 }
335
336 /**
337  * g_buffered_input_stream_new_sized:
338  * @base_stream: a #GOutputStream.
339  * @size: a #gsize.
340  * 
341  * Creates a new #GBufferedInputStream from the given @base_stream, 
342  * with a buffer set to @size.
343  *
344  * Returns: a #GInputStream.
345  **/
346 GInputStream *
347 g_buffered_input_stream_new_sized (GInputStream *base_stream,
348                                    gsize         size)
349 {
350   GInputStream *stream;
351
352   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
353
354   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
355                          "base-stream", base_stream,
356                          "buffer-size", (guint)size,
357                          NULL);
358
359   return stream;
360 }
361
362 /**
363  * g_buffered_input_stream_fill:
364  * @stream: #GBufferedInputStream.
365  * @count: the number of bytes that will be read from the stream.
366  * @cancellable: optional #GCancellable object, %NULL to ignore.
367  * @error: location to store the error occuring, or %NULL to ignore.
368  *
369  * Tries to read @count bytes from the stream into the buffer. 
370  * Will block during this read.
371  * 
372  * If @count is zero, returns zero and does nothing. A value of @count
373  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
374  *
375  * On success, the number of bytes read into the buffer is returned.
376  * It is not an error if this is not the same as the requested size, as it
377  * can happen e.g. near the end of a file. Zero is returned on end of file
378  * (or if @count is zero),  but never otherwise.
379  *
380  * If @cancellable is not %NULL, then the operation can be cancelled by
381  * triggering the cancellable object from another thread. If the operation
382  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
383  * operation was partially finished when the operation was cancelled the
384  * partial result will be returned, without an error.
385  *
386  * On error -1 is returned and @error is set accordingly.
387  * 
388  * For the asynchronous, non-blocking, version of this function, see 
389  * g_buffered_input_stream_fill_async().
390  *
391  * Returns: the number of bytes read into @stream's buffer, up to @count, 
392  *     or -1 on error.
393  **/
394 gssize
395 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
396                               gssize                 count,
397                               GCancellable          *cancellable,
398                               GError               **error)
399 {
400   GBufferedInputStreamClass *class;
401   GInputStream *input_stream;
402   gssize res;
403
404   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
405   
406   input_stream = G_INPUT_STREAM (stream);
407   
408   if (g_input_stream_is_closed (input_stream))
409     {
410       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
411                    _("Stream is already closed"));
412       return -1;
413     }
414   
415   if (g_input_stream_has_pending (input_stream))
416     {
417       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
418                    _("Stream has outstanding operation"));
419       return -1;
420     }
421       
422   g_input_stream_set_pending (input_stream, TRUE);
423
424   if (cancellable)
425     g_push_current_cancellable (cancellable);
426   
427   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
428   res = class->fill (stream, count, cancellable, error);
429
430   if (cancellable)
431     g_pop_current_cancellable (cancellable);
432   
433   g_input_stream_set_pending (input_stream, FALSE);
434   
435   return res;
436 }
437
438 static void
439 async_fill_callback_wrapper (GObject      *source_object,
440                              GAsyncResult *res,
441                              gpointer      user_data)
442 {
443   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
444
445   g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE);
446   (*stream->priv->outstanding_callback) (source_object, res, user_data);
447   g_object_unref (stream);
448 }
449
450 /**
451  * g_buffered_input_stream_fill_async:
452  * @stream: #GBufferedInputStream.
453  * @count: a #gssize.
454  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
455  *     of the request.
456  * @cancellable: optional #GCancellable object
457  * @callback: a #GAsyncReadyCallback.
458  * @user_data: a #gpointer.
459  *
460  * Reads data into @stream's buffer asynchronously, up to @count size.
461  * @io_priority can be used to prioritize reads. For the synchronous
462  * version of this function, see g_buffered_input_stream_fill().
463  **/
464 void
465 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
466                                     gssize                count,
467                                     int                   io_priority,
468                                     GCancellable         *cancellable,
469                                     GAsyncReadyCallback   callback,
470                                     gpointer              user_data)
471 {
472   GBufferedInputStreamClass *class;
473   GSimpleAsyncResult *simple;
474
475   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
476
477   if (count == 0)
478     {
479       simple = g_simple_async_result_new (G_OBJECT (stream),
480                                           callback,
481                                           user_data,
482                                           g_buffered_input_stream_fill_async);
483       g_simple_async_result_complete_in_idle (simple);
484       g_object_unref (simple);
485       return;
486     }
487   
488   if (((gssize) count) < 0)
489     {
490       g_simple_async_report_error_in_idle (G_OBJECT (stream),
491                                            callback,
492                                            user_data,
493                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
494                                            _("Too large count value passed to g_input_stream_read_async"));
495       return;
496     }
497   
498   if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
499     {
500       g_simple_async_report_error_in_idle (G_OBJECT (stream),
501                                            callback,
502                                            user_data,
503                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
504                                            _("Stream is already closed"));
505       return;
506     }
507     
508   if (g_input_stream_has_pending (G_INPUT_STREAM (stream)))
509     {
510       g_simple_async_report_error_in_idle (G_OBJECT (stream),
511                                            callback,
512                                            user_data,
513                                            G_IO_ERROR, G_IO_ERROR_PENDING,
514                                            _("Stream has outstanding operation"));
515       return;
516     }
517
518   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
519   
520   g_input_stream_set_pending (G_INPUT_STREAM (stream), TRUE);
521   stream->priv->outstanding_callback = callback;
522   g_object_ref (stream);
523   class->fill_async (stream, count, io_priority, cancellable,
524                      async_fill_callback_wrapper, user_data);
525 }
526
527 /**
528  * g_buffered_input_stream_fill_finish:
529  * @stream: a #GBufferedInputStream.
530  * @result: a #GAsyncResult.
531  * @error: a #GError.
532  *
533  * Finishes an asynchronous read.
534  * 
535  * Returns: a #gssize of the read stream, or %-1 on an error. 
536  **/
537 gssize
538 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
539                                      GAsyncResult          *result,
540                                      GError               **error)
541 {
542   GSimpleAsyncResult *simple;
543   GBufferedInputStreamClass *class;
544
545   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
546   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
547
548   if (G_IS_SIMPLE_ASYNC_RESULT (result))
549     {
550       simple = G_SIMPLE_ASYNC_RESULT (result);
551       if (g_simple_async_result_propagate_error (simple, error))
552         return -1;
553
554       /* Special case read of 0 bytes */
555       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
556         return 0;
557     }
558
559   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
560   return class->fill_finish (stream, result, error);
561 }
562
563 /**
564  * g_buffered_input_stream_get_available:
565  * @stream: #GBufferedInputStream.
566  * 
567  * Gets the size of the available data within the stream.
568  * 
569  * Returns: size of the available stream. 
570  **/
571 gsize
572 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
573 {
574   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
575
576   return stream->priv->end - stream->priv->pos;
577 }
578
579 /**
580  * g_buffered_input_stream_peek:
581  * @stream: a #GBufferedInputStream.
582  * @buffer: a pointer to an allocated chunk of memory.
583  * @offset: a #gsize.
584  * @count: a #gsize.
585  * 
586  * Peeks in the buffer, copying data of size @count into @buffer, 
587  * offset @offset bytes.
588  * 
589  * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
590  **/
591 gsize
592 g_buffered_input_stream_peek (GBufferedInputStream *stream,
593                               void                 *buffer,
594                               gsize                 offset,
595                               gsize                 count)
596 {
597   gsize available;
598   gsize end;
599   
600   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
601   g_return_val_if_fail (buffer != NULL, -1);
602
603   available = g_buffered_input_stream_get_available (stream);
604
605   if (offset > available)
606     return 0;
607   
608   end = MIN (offset + count, available);
609   count = end - offset;
610   
611   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
612   return count;
613 }
614
615 /**
616  * g_buffered_input_stream_peek_buffer:
617  * @stream: a #GBufferedInputStream.
618  * @count: a #gsize to get the number of bytes available in the buffer.
619  *
620  * Returns the buffer with the currently available bytes. The returned
621  * buffer must not be modified and will become invalid when reading from
622  * the stream or filling the buffer.
623  *
624  * Returns: read-only buffer
625  **/
626 const void*
627 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
628                                      gsize                *count)
629 {
630   GBufferedInputStreamPrivate *priv;
631
632   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
633
634   priv = stream->priv;
635
636   if (count) 
637     *count = priv->end - priv->pos;
638
639   return priv->buffer + priv->pos;
640 }
641
642 static void
643 compact_buffer (GBufferedInputStream *stream)
644 {
645   GBufferedInputStreamPrivate *priv;
646   gsize current_size;
647
648   priv = stream->priv;
649
650   current_size = priv->end - priv->pos;
651   
652   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
653   
654   priv->pos = 0;
655   priv->end = current_size;
656 }
657
658 static gssize
659 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
660                                    gssize                 count,
661                                    GCancellable          *cancellable,
662                                    GError               **error)
663 {
664   GBufferedInputStreamPrivate *priv;
665   GInputStream *base_stream;
666   gssize nread;
667   gsize in_buffer;
668
669   priv = stream->priv;
670
671   if (count == -1)
672     count = priv->len;
673   
674   in_buffer = priv->end - priv->pos;
675
676   /* Never fill more than can fit in the buffer */
677   count = MIN (count, priv->len - in_buffer);
678
679   /* If requested length does not fit at end, compact */
680   if (priv->len - priv->end < count)
681     compact_buffer (stream);
682
683   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
684   nread = g_input_stream_read (base_stream,
685                                priv->buffer + priv->end,
686                                count,
687                                cancellable,
688                                error);
689
690   if (nread > 0)
691     priv->end += nread;
692   
693   return nread;
694 }
695
696 static gssize
697 g_buffered_input_stream_skip (GInputStream  *stream,
698                               gsize          count,
699                               GCancellable  *cancellable,
700                               GError       **error)
701 {
702   GBufferedInputStream        *bstream;
703   GBufferedInputStreamPrivate *priv;
704   GBufferedInputStreamClass *class;
705   GInputStream *base_stream;
706   gsize available, bytes_skipped;
707   gssize nread;
708
709   bstream = G_BUFFERED_INPUT_STREAM (stream);
710   priv = bstream->priv;
711
712   available = priv->end - priv->pos;
713
714   if (count <= available)
715     {
716       priv->pos += count;
717       return count;
718     }
719
720   /* Full request not available, skip all currently available and 
721    * request refill for more 
722    */
723   
724   priv->pos = 0;
725   priv->end = 0;
726   bytes_skipped = available;
727   count -= available;
728
729   if (bytes_skipped > 0)
730     error = NULL; /* Ignore further errors if we already read some data */
731   
732   if (count > priv->len)
733     {
734       /* Large request, shortcut buffer */
735       
736       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
737
738       nread = g_input_stream_skip (base_stream,
739                                    count,
740                                    cancellable,
741                                    error);
742       
743       if (nread < 0 && bytes_skipped == 0)
744         return -1;
745       
746       if (nread > 0)
747         bytes_skipped += nread;
748       
749       return bytes_skipped;
750     }
751   
752   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
753   nread = class->fill (bstream, priv->len, cancellable, error);
754   
755   if (nread < 0)
756     {
757       if (bytes_skipped == 0)
758         return -1;
759       else
760         return bytes_skipped;
761     }
762   
763   available = priv->end - priv->pos;
764   count = MIN (count, available);
765   
766   bytes_skipped += count;
767   priv->pos += count;
768   
769   return bytes_skipped;
770 }
771
772 static gssize
773 g_buffered_input_stream_read (GInputStream *stream,
774                               void         *buffer,
775                               gsize         count,
776                               GCancellable *cancellable,
777                               GError      **error)
778 {
779   GBufferedInputStream        *bstream;
780   GBufferedInputStreamPrivate *priv;
781   GBufferedInputStreamClass *class;
782   GInputStream *base_stream;
783   gsize available, bytes_read;
784   gssize nread;
785
786   bstream = G_BUFFERED_INPUT_STREAM (stream);
787   priv = bstream->priv;
788
789   available = priv->end - priv->pos;
790
791   if (count <= available)
792     {
793       memcpy (buffer, priv->buffer + priv->pos, count);
794       priv->pos += count;
795       return count;
796     }
797   
798   /* Full request not available, read all currently availbile and request refill for more */
799   
800   memcpy (buffer, priv->buffer + priv->pos, available);
801   priv->pos = 0;
802   priv->end = 0;
803   bytes_read = available;
804   count -= available;
805
806   if (bytes_read > 0)
807     error = NULL; /* Ignore further errors if we already read some data */
808   
809   if (count > priv->len)
810     {
811       /* Large request, shortcut buffer */
812       
813       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
814
815       nread = g_input_stream_read (base_stream,
816                                    (char *)buffer + bytes_read,
817                                    count,
818                                    cancellable,
819                                    error);
820       
821       if (nread < 0 && bytes_read == 0)
822         return -1;
823       
824       if (nread > 0)
825         bytes_read += nread;
826       
827       return bytes_read;
828     }
829   
830   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
831   nread = class->fill (bstream, priv->len, cancellable, error);
832   if (nread < 0)
833     {
834       if (bytes_read == 0)
835         return -1;
836       else
837         return bytes_read;
838     }
839   
840   available = priv->end - priv->pos;
841   count = MIN (count, available);
842   
843   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
844   bytes_read += count;
845   priv->pos += count;
846   
847   return bytes_read;
848 }
849
850 /**
851  * g_buffered_input_stream_read_byte:
852  * @stream: #GBufferedInputStream.
853  * @cancellable: optional #GCancellable object, %NULL to ignore.
854  * @error: location to store the error occuring, or %NULL to ignore.
855  *
856  * Tries to read a single byte from the stream or the buffer. Will block
857  * during this read.
858  *
859  * On success, the byte read from the stream is returned. On end of stream
860  * -1 is returned but it's not an exceptional error and @error is not set.
861  * 
862  * If @cancellable is not %NULL, then the operation can be cancelled by
863  * triggering the cancellable object from another thread. If the operation
864  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
865  * operation was partially finished when the operation was cancelled the
866  * partial result will be returned, without an error.
867  *
868  * On error -1 is returned and @error is set accordingly.
869  * 
870  * Returns: the byte read from the @stream, or -1 on end of stream or error.
871  **/
872 int
873 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
874                                    GCancellable          *cancellable,
875                                    GError               **error)
876 {
877   GBufferedInputStreamPrivate *priv;
878   GBufferedInputStreamClass *class;
879   GInputStream *input_stream;
880   gsize available;
881   gssize nread;
882
883   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
884
885   priv = stream->priv;
886   input_stream = G_INPUT_STREAM (stream);
887
888   if (g_input_stream_is_closed (input_stream))
889     {
890       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
891        _("Stream is already closed"));
892       return -1;
893     }
894
895   if (g_input_stream_has_pending (input_stream))
896     {
897       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
898        _("Stream has outstanding operation"));
899       return -1;
900     }
901
902   available = priv->end - priv->pos;
903
904   if (available < 1)
905     return priv->buffer[priv->pos++];
906
907   /* Byte not available, request refill for more */
908
909   g_input_stream_set_pending (input_stream, TRUE);
910
911   if (cancellable)
912     g_push_current_cancellable (cancellable);
913
914   priv->pos = 0;
915   priv->end = 0;
916
917   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
918   nread = class->fill (stream, priv->len, cancellable, error);
919
920   if (cancellable)
921     g_pop_current_cancellable (cancellable);
922
923   g_input_stream_set_pending (input_stream, FALSE);
924
925   if (nread <= 0)
926     return -1; /* error or end of stream */
927
928   return priv->buffer[priv->pos++];
929 }
930
931 /* ************************** */
932 /* Async stuff implementation */
933 /* ************************** */
934
935 static void
936 fill_async_callback (GObject      *source_object,
937                      GAsyncResult *result,
938                      gpointer      user_data)
939 {
940   GError *error;
941   gssize res;
942   GSimpleAsyncResult *simple;
943
944   simple = user_data;
945   
946   error = NULL;
947   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
948                                     result, &error);
949
950   g_simple_async_result_set_op_res_gssize (simple, res);
951   if (res == -1)
952     {
953       g_simple_async_result_set_from_error (simple, error);
954       g_error_free (error);
955     }
956   
957   /* Complete immediately, not in idle, since we're already in a mainloop callout */
958   g_simple_async_result_complete (simple);
959   g_object_unref (simple);
960 }
961
962 static void
963 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
964                                          gssize                count,
965                                          int                   io_priority,
966                                          GCancellable         *cancellable,
967                                          GAsyncReadyCallback   callback,
968                                          gpointer              user_data)
969 {
970   GBufferedInputStreamPrivate *priv;
971   GInputStream *base_stream;
972   GSimpleAsyncResult *simple;
973   gsize in_buffer;
974
975   priv = stream->priv;
976
977   if (count == -1)
978     count = priv->len;
979   
980   in_buffer = priv->end - priv->pos;
981
982   /* Never fill more than can fit in the buffer */
983   count = MIN (count, priv->len - in_buffer);
984
985   /* If requested length does not fit at end, compact */
986   if (priv->len - priv->end < count)
987     compact_buffer (stream);
988
989   simple = g_simple_async_result_new (G_OBJECT (stream),
990                                       callback, user_data,
991                                       g_buffered_input_stream_real_fill_async);
992   
993   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
994   g_input_stream_read_async (base_stream,
995                              priv->buffer + priv->end,
996                              count,
997                              io_priority,
998                              cancellable,
999                              fill_async_callback,
1000                              simple);
1001 }
1002
1003 static gssize
1004 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
1005                                           GAsyncResult         *result,
1006                                           GError              **error)
1007 {
1008   GSimpleAsyncResult *simple;
1009   gssize nread;
1010
1011   simple = G_SIMPLE_ASYNC_RESULT (result);
1012   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1013   
1014   nread = g_simple_async_result_get_op_res_gssize (simple);
1015   return nread;
1016 }
1017
1018 typedef struct {
1019   gssize bytes_read;
1020   gssize count;
1021   void *buffer;
1022 } ReadAsyncData;
1023
1024 static void 
1025 free_read_async_data (gpointer _data)
1026 {
1027   ReadAsyncData *data = _data;
1028   g_slice_free (ReadAsyncData, data);
1029 }
1030
1031 static void
1032 large_read_callback (GObject *source_object,
1033                      GAsyncResult *result,
1034                      gpointer user_data)
1035 {
1036   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1037   ReadAsyncData *data;
1038   GError *error;
1039   gssize nread;
1040
1041   data = g_simple_async_result_get_op_res_gpointer (simple);
1042   
1043   error = NULL;
1044   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1045                                       result, &error);
1046
1047   /* Only report the error if we've not already read some data */
1048   if (nread < 0 && data->bytes_read == 0)
1049     g_simple_async_result_set_from_error (simple, error);
1050   
1051   if (nread > 0)
1052     data->bytes_read += nread;
1053   
1054   if (error)
1055     g_error_free (error);
1056   
1057   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1058   g_simple_async_result_complete (simple);
1059   g_object_unref (simple);
1060 }
1061
1062 static void
1063 read_fill_buffer_callback (GObject *source_object,
1064                            GAsyncResult *result,
1065                            gpointer user_data)
1066 {
1067   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1068   GBufferedInputStream *bstream;
1069   GBufferedInputStreamPrivate *priv;
1070   ReadAsyncData *data;
1071   GError *error;
1072   gssize nread;
1073   gsize available;
1074
1075   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1076   priv = bstream->priv;
1077   
1078   data = g_simple_async_result_get_op_res_gpointer (simple);
1079   
1080   error = NULL;
1081   nread = g_buffered_input_stream_fill_finish (bstream,
1082                                                result, &error);
1083   
1084   if (nread < 0 && data->bytes_read == 0)
1085     g_simple_async_result_set_from_error (simple, error);
1086
1087
1088   if (nread > 0)
1089     {
1090       available = priv->end - priv->pos;
1091       data->count = MIN (data->count, available);
1092       
1093       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1094       data->bytes_read += data->count;
1095       priv->pos += data->count;
1096     }
1097
1098   if (error)
1099     g_error_free (error);
1100   
1101   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1102   g_simple_async_result_complete (simple);
1103   g_object_unref (simple);
1104 }
1105
1106 static void
1107 g_buffered_input_stream_read_async (GInputStream              *stream,
1108                                     void                      *buffer,
1109                                     gsize                      count,
1110                                     int                        io_priority,
1111                                     GCancellable              *cancellable,
1112                                     GAsyncReadyCallback        callback,
1113                                     gpointer                   user_data)
1114 {
1115   GBufferedInputStream *bstream;
1116   GBufferedInputStreamPrivate *priv;
1117   GBufferedInputStreamClass *class;
1118   GInputStream *base_stream;
1119   gsize available;
1120   GSimpleAsyncResult *simple;
1121   ReadAsyncData *data;
1122
1123   bstream = G_BUFFERED_INPUT_STREAM (stream);
1124   priv = bstream->priv;
1125
1126   data = g_slice_new (ReadAsyncData);
1127   data->buffer = buffer;
1128   data->bytes_read = 0;
1129   simple = g_simple_async_result_new (G_OBJECT (stream),
1130                                       callback, user_data,
1131                                       g_buffered_input_stream_read_async);
1132   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1133   
1134   available = priv->end - priv->pos;
1135   
1136   if (count <= available)
1137     {
1138       memcpy (buffer, priv->buffer + priv->pos, count);
1139       priv->pos += count;
1140       data->bytes_read = count;
1141       
1142       g_simple_async_result_complete_in_idle (simple);
1143       g_object_unref (simple);
1144       return;
1145     }
1146
1147
1148   /* Full request not available, read all currently availbile and request refill for more */
1149   
1150   memcpy (buffer, priv->buffer + priv->pos, available);
1151   priv->pos = 0;
1152   priv->end = 0;
1153   
1154   count -= available;
1155   
1156   data->bytes_read = available;
1157   data->count = count;
1158
1159   if (count > priv->len)
1160     {
1161       /* Large request, shortcut buffer */
1162       
1163       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1164       
1165       g_input_stream_read_async (base_stream,
1166                                  (char *)buffer + data->bytes_read,
1167                                  count,
1168                                  io_priority, cancellable,
1169                                  large_read_callback,
1170                                  simple);
1171     }
1172   else
1173     {
1174       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1175       class->fill_async (bstream, priv->len, io_priority, cancellable,
1176                          read_fill_buffer_callback, simple);
1177     }
1178 }
1179
1180 static gssize
1181 g_buffered_input_stream_read_finish (GInputStream   *stream,
1182                                      GAsyncResult   *result,
1183                                      GError        **error)
1184 {
1185   GSimpleAsyncResult *simple;
1186   ReadAsyncData *data;
1187   
1188   simple = G_SIMPLE_ASYNC_RESULT (result);
1189   
1190   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1191
1192   data = g_simple_async_result_get_op_res_gpointer (simple);
1193   
1194   return data->bytes_read;
1195 }
1196
1197 typedef struct {
1198   gssize bytes_skipped;
1199   gssize count;
1200 } SkipAsyncData;
1201
1202 static void 
1203 free_skip_async_data (gpointer _data)
1204 {
1205   SkipAsyncData *data = _data;
1206   g_slice_free (SkipAsyncData, data);
1207 }
1208
1209 static void
1210 large_skip_callback (GObject *source_object,
1211                      GAsyncResult *result,
1212                      gpointer user_data)
1213 {
1214   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1215   SkipAsyncData *data;
1216   GError *error;
1217   gssize nread;
1218
1219   data = g_simple_async_result_get_op_res_gpointer (simple);
1220   
1221   error = NULL;
1222   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1223                                       result, &error);
1224
1225   /* Only report the error if we've not already read some data */
1226   if (nread < 0 && data->bytes_skipped == 0)
1227     g_simple_async_result_set_from_error (simple, error);
1228   
1229   if (nread > 0)
1230     data->bytes_skipped += nread;
1231   
1232   if (error)
1233     g_error_free (error);
1234   
1235   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1236   g_simple_async_result_complete (simple);
1237   g_object_unref (simple);
1238 }
1239
1240 static void
1241 skip_fill_buffer_callback (GObject *source_object,
1242                            GAsyncResult *result,
1243                            gpointer user_data)
1244 {
1245   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1246   GBufferedInputStream *bstream;
1247   GBufferedInputStreamPrivate *priv;
1248   SkipAsyncData *data;
1249   GError *error;
1250   gssize nread;
1251   gsize available;
1252
1253   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1254   priv = bstream->priv;
1255   
1256   data = g_simple_async_result_get_op_res_gpointer (simple);
1257   
1258   error = NULL;
1259   nread = g_buffered_input_stream_fill_finish (bstream,
1260                                                result, &error);
1261   
1262   if (nread < 0 && data->bytes_skipped == 0)
1263     g_simple_async_result_set_from_error (simple, error);
1264
1265
1266   if (nread > 0)
1267     {
1268       available = priv->end - priv->pos;
1269       data->count = MIN (data->count, available);
1270       
1271       data->bytes_skipped += data->count;
1272       priv->pos += data->count;
1273     }
1274
1275   if (error)
1276     g_error_free (error);
1277   
1278   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1279   g_simple_async_result_complete (simple);
1280   g_object_unref (simple);
1281 }
1282
1283 static void
1284 g_buffered_input_stream_skip_async (GInputStream              *stream,
1285                                     gsize                      count,
1286                                     int                        io_priority,
1287                                     GCancellable              *cancellable,
1288                                     GAsyncReadyCallback        callback,
1289                                     gpointer                   user_data)
1290 {
1291   GBufferedInputStream *bstream;
1292   GBufferedInputStreamPrivate *priv;
1293   GBufferedInputStreamClass *class;
1294   GInputStream *base_stream;
1295   gsize available;
1296   GSimpleAsyncResult *simple;
1297   SkipAsyncData *data;
1298
1299   bstream = G_BUFFERED_INPUT_STREAM (stream);
1300   priv = bstream->priv;
1301
1302   data = g_slice_new (SkipAsyncData);
1303   data->bytes_skipped = 0;
1304   simple = g_simple_async_result_new (G_OBJECT (stream),
1305                                       callback, user_data,
1306                                       g_buffered_input_stream_skip_async);
1307   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1308   
1309   available = priv->end - priv->pos;
1310   
1311   if (count <= available)
1312     {
1313       priv->pos += count;
1314       data->bytes_skipped = count;
1315       
1316       g_simple_async_result_complete_in_idle (simple);
1317       g_object_unref (simple);
1318       return;
1319     }
1320
1321
1322   /* Full request not available, skip all currently availbile and request refill for more */
1323   
1324   priv->pos = 0;
1325   priv->end = 0;
1326   
1327   count -= available;
1328   
1329   data->bytes_skipped = available;
1330   data->count = count;
1331
1332   if (count > priv->len)
1333     {
1334       /* Large request, shortcut buffer */
1335       
1336       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1337       
1338       g_input_stream_skip_async (base_stream,
1339                                  count,
1340                                  io_priority, cancellable,
1341                                  large_skip_callback,
1342                                  simple);
1343     }
1344   else
1345     {
1346       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1347       class->fill_async (bstream, priv->len, io_priority, cancellable,
1348                          skip_fill_buffer_callback, simple);
1349     }
1350 }
1351
1352 static gssize
1353 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1354                                      GAsyncResult   *result,
1355                                      GError        **error)
1356 {
1357   GSimpleAsyncResult *simple;
1358   SkipAsyncData *data;
1359   
1360   simple = G_SIMPLE_ASYNC_RESULT (result);
1361   
1362   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1363
1364   data = g_simple_async_result_get_op_res_gpointer (simple);
1365   
1366   return data->bytes_skipped;
1367 }
1368
1369
1370 #define __G_BUFFERED_INPUT_STREAM_C__
1371 #include "gioaliasdef.c"