gio: use GPollable* to implement fallback read_async/write_async
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org>
22  */
23
24 #include "config.h"
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gcancellable.h"
28 #include "gasyncresult.h"
29 #include "gsimpleasyncresult.h"
30 #include "gioerror.h"
31 #include <string.h>
32 #include "glibintl.h"
33
34
35 /**
36  * SECTION:gbufferedinputstream
37  * @short_description: Buffered Input Stream
38  * @include: gio/gio.h
39  * @see_also: #GFilterInputStream, #GInputStream
40  *
41  * Buffered input stream implements #GFilterInputStream and provides
42  * for buffered reads.
43  *
44  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
45  *
46  * To create a buffered input stream, use g_buffered_input_stream_new(),
47  * or g_buffered_input_stream_new_sized() to specify the buffer's size at
48  * construction.
49  *
50  * To get the size of a buffer within a buffered input stream, use
51  * g_buffered_input_stream_get_buffer_size(). To change the size of a
52  * buffered input stream's buffer, use
53  * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
54  * cannot be reduced below the size of the data within the buffer.
55  */
56
57
58 #define DEFAULT_BUFFER_SIZE 4096
59
60 struct _GBufferedInputStreamPrivate {
61   guint8 *buffer;
62   gsize   len;
63   gsize   pos;
64   gsize   end;
65   GAsyncReadyCallback outstanding_callback;
66 };
67
68 enum {
69   PROP_0,
70   PROP_BUFSIZE
71 };
72
73 static void g_buffered_input_stream_set_property  (GObject      *object,
74                                                    guint         prop_id,
75                                                    const GValue *value,
76                                                    GParamSpec   *pspec);
77
78 static void g_buffered_input_stream_get_property  (GObject      *object,
79                                                    guint         prop_id,
80                                                    GValue       *value,
81                                                    GParamSpec   *pspec);
82 static void g_buffered_input_stream_finalize      (GObject *object);
83
84
85 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
86                                                         gsize                  count,
87                                                         GCancellable          *cancellable,
88                                                         GError               **error);
89 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
90                                                         gsize                  count,
91                                                         int                    io_priority,
92                                                         GCancellable          *cancellable,
93                                                         GAsyncReadyCallback    callback,
94                                                         gpointer               user_data);
95 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
96                                                         GAsyncResult          *result,
97                                                         GError               **error);
98 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
99                                                         void                  *buffer,
100                                                         gsize                  count,
101                                                         GCancellable          *cancellable,
102                                                         GError               **error);
103 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
104                                                         gssize                 count,
105                                                         GCancellable          *cancellable,
106                                                         GError               **error);
107 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
108                                                         gssize                 count,
109                                                         int                    io_priority,
110                                                         GCancellable          *cancellable,
111                                                         GAsyncReadyCallback    callback,
112                                                         gpointer               user_data);
113 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
114                                                         GAsyncResult          *result,
115                                                         GError               **error);
116
117 static void compact_buffer (GBufferedInputStream *stream);
118
119 G_DEFINE_TYPE (GBufferedInputStream,
120                g_buffered_input_stream,
121                G_TYPE_FILTER_INPUT_STREAM)
122
123
124 static void
125 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
126 {
127   GObjectClass *object_class;
128   GInputStreamClass *istream_class;
129   GBufferedInputStreamClass *bstream_class;
130
131   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
132
133   object_class = G_OBJECT_CLASS (klass);
134   object_class->get_property = g_buffered_input_stream_get_property;
135   object_class->set_property = g_buffered_input_stream_set_property;
136   object_class->finalize     = g_buffered_input_stream_finalize;
137
138   istream_class = G_INPUT_STREAM_CLASS (klass);
139   istream_class->skip = g_buffered_input_stream_skip;
140   istream_class->skip_async  = g_buffered_input_stream_skip_async;
141   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
142   istream_class->read_fn = g_buffered_input_stream_read;
143
144   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
145   bstream_class->fill = g_buffered_input_stream_real_fill;
146   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
147   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
148
149   g_object_class_install_property (object_class,
150                                    PROP_BUFSIZE,
151                                    g_param_spec_uint ("buffer-size",
152                                                       P_("Buffer Size"),
153                                                       P_("The size of the backend buffer"),
154                                                       1,
155                                                       G_MAXUINT,
156                                                       DEFAULT_BUFFER_SIZE,
157                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
158                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
159
160
161 }
162
163 /**
164  * g_buffered_input_stream_get_buffer_size:
165  * @stream: a #GBufferedInputStream
166  *
167  * Gets the size of the input buffer.
168  *
169  * Returns: the current buffer size.
170  */
171 gsize
172 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
173 {
174   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
175
176   return stream->priv->len;
177 }
178
179 /**
180  * g_buffered_input_stream_set_buffer_size:
181  * @stream: a #GBufferedInputStream
182  * @size: a #gsize
183  *
184  * Sets the size of the internal buffer of @stream to @size, or to the
185  * size of the contents of the buffer. The buffer can never be resized
186  * smaller than its current contents.
187  */
188 void
189 g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream,
190                                          gsize                 size)
191 {
192   GBufferedInputStreamPrivate *priv;
193   gsize in_buffer;
194   guint8 *buffer;
195
196   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
197
198   priv = stream->priv;
199
200   if (priv->len == size)
201     return;
202
203   if (priv->buffer)
204     {
205       in_buffer = priv->end - priv->pos;
206
207       /* Never resize smaller than current buffer contents */
208       size = MAX (size, in_buffer);
209
210       buffer = g_malloc (size);
211       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
212       priv->len = size;
213       priv->pos = 0;
214       priv->end = in_buffer;
215       g_free (priv->buffer);
216       priv->buffer = buffer;
217     }
218   else
219     {
220       priv->len = size;
221       priv->pos = 0;
222       priv->end = 0;
223       priv->buffer = g_malloc (size);
224     }
225
226   g_object_notify (G_OBJECT (stream), "buffer-size");
227 }
228
229 static void
230 g_buffered_input_stream_set_property (GObject      *object,
231                                       guint         prop_id,
232                                       const GValue *value,
233                                       GParamSpec   *pspec)
234 {
235   GBufferedInputStream        *bstream;
236
237   bstream = G_BUFFERED_INPUT_STREAM (object);
238
239   switch (prop_id)
240     {
241     case PROP_BUFSIZE:
242       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
243       break;
244
245     default:
246       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
247       break;
248     }
249 }
250
251 static void
252 g_buffered_input_stream_get_property (GObject    *object,
253                                       guint       prop_id,
254                                       GValue     *value,
255                                       GParamSpec *pspec)
256 {
257   GBufferedInputStreamPrivate *priv;
258   GBufferedInputStream        *bstream;
259
260   bstream = G_BUFFERED_INPUT_STREAM (object);
261   priv = bstream->priv;
262
263   switch (prop_id)
264     {
265     case PROP_BUFSIZE:
266       g_value_set_uint (value, priv->len);
267       break;
268
269     default:
270       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
271       break;
272     }
273 }
274
275 static void
276 g_buffered_input_stream_finalize (GObject *object)
277 {
278   GBufferedInputStreamPrivate *priv;
279   GBufferedInputStream        *stream;
280
281   stream = G_BUFFERED_INPUT_STREAM (object);
282   priv = stream->priv;
283
284   g_free (priv->buffer);
285
286   G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
287 }
288
289 static void
290 g_buffered_input_stream_init (GBufferedInputStream *stream)
291 {
292   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
293                                               G_TYPE_BUFFERED_INPUT_STREAM,
294                                               GBufferedInputStreamPrivate);
295 }
296
297
298 /**
299  * g_buffered_input_stream_new:
300  * @base_stream: a #GInputStream
301  *
302  * Creates a new #GInputStream from the given @base_stream, with
303  * a buffer set to the default size (4 kilobytes).
304  *
305  * Returns: a #GInputStream for the given @base_stream.
306  */
307 GInputStream *
308 g_buffered_input_stream_new (GInputStream *base_stream)
309 {
310   GInputStream *stream;
311
312   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
313
314   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
315                          "base-stream", base_stream,
316                          NULL);
317
318   return stream;
319 }
320
321 /**
322  * g_buffered_input_stream_new_sized:
323  * @base_stream: a #GInputStream
324  * @size: a #gsize
325  *
326  * Creates a new #GBufferedInputStream from the given @base_stream,
327  * with a buffer set to @size.
328  *
329  * Returns: a #GInputStream.
330  */
331 GInputStream *
332 g_buffered_input_stream_new_sized (GInputStream *base_stream,
333                                    gsize         size)
334 {
335   GInputStream *stream;
336
337   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
338
339   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
340                          "base-stream", base_stream,
341                          "buffer-size", (guint)size,
342                          NULL);
343
344   return stream;
345 }
346
347 /**
348  * g_buffered_input_stream_fill:
349  * @stream: a #GBufferedInputStream
350  * @count: the number of bytes that will be read from the stream
351  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore
352  * @error: location to store the error occurring, or %NULL to ignore
353  *
354  * Tries to read @count bytes from the stream into the buffer.
355  * Will block during this read.
356  *
357  * If @count is zero, returns zero and does nothing. A value of @count
358  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
359  *
360  * On success, the number of bytes read into the buffer is returned.
361  * It is not an error if this is not the same as the requested size, as it
362  * can happen e.g. near the end of a file. Zero is returned on end of file
363  * (or if @count is zero),  but never otherwise.
364  *
365  * If @count is -1 then the attempted read size is equal to the number of
366  * bytes that are required to fill the buffer.
367  *
368  * If @cancellable is not %NULL, then the operation can be cancelled by
369  * triggering the cancellable object from another thread. If the operation
370  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
371  * operation was partially finished when the operation was cancelled the
372  * partial result will be returned, without an error.
373  *
374  * On error -1 is returned and @error is set accordingly.
375  *
376  * For the asynchronous, non-blocking, version of this function, see
377  * g_buffered_input_stream_fill_async().
378  *
379  * Returns: the number of bytes read into @stream's buffer, up to @count,
380  *     or -1 on error.
381  */
382 gssize
383 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
384                               gssize                 count,
385                               GCancellable          *cancellable,
386                               GError               **error)
387 {
388   GBufferedInputStreamClass *class;
389   GInputStream *input_stream;
390   gssize res;
391
392   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
393
394   input_stream = G_INPUT_STREAM (stream);
395
396   if (count < -1)
397     {
398       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
399                    _("Too large count value passed to %s"), G_STRFUNC);
400       return -1;
401     }
402
403   if (!g_input_stream_set_pending (input_stream, error))
404     return -1;
405
406   if (cancellable)
407     g_cancellable_push_current (cancellable);
408
409   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
410   res = class->fill (stream, count, cancellable, error);
411
412   if (cancellable)
413     g_cancellable_pop_current (cancellable);
414
415   g_input_stream_clear_pending (input_stream);
416
417   return res;
418 }
419
420 static void
421 async_fill_callback_wrapper (GObject      *source_object,
422                              GAsyncResult *res,
423                              gpointer      user_data)
424 {
425   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
426
427   g_input_stream_clear_pending (G_INPUT_STREAM (stream));
428   (*stream->priv->outstanding_callback) (source_object, res, user_data);
429   g_object_unref (stream);
430 }
431
432 /**
433  * g_buffered_input_stream_fill_async:
434  * @stream: a #GBufferedInputStream
435  * @count: the number of bytes that will be read from the stream
436  * @io_priority: the <link linkend="io-priority">I/O priority</link>
437  *     of the request
438  * @cancellable: (allow-none): optional #GCancellable object
439  * @callback: (scope async): a #GAsyncReadyCallback
440  * @user_data: (closure): a #gpointer
441  *
442  * Reads data into @stream's buffer asynchronously, up to @count size.
443  * @io_priority can be used to prioritize reads. For the synchronous
444  * version of this function, see g_buffered_input_stream_fill().
445  *
446  * If @count is -1 then the attempted read size is equal to the number
447  * of bytes that are required to fill the buffer.
448  */
449 void
450 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
451                                     gssize                count,
452                                     int                   io_priority,
453                                     GCancellable         *cancellable,
454                                     GAsyncReadyCallback   callback,
455                                     gpointer              user_data)
456 {
457   GBufferedInputStreamClass *class;
458   GSimpleAsyncResult *simple;
459   GError *error = NULL;
460
461   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
462
463   if (count == 0)
464     {
465       simple = g_simple_async_result_new (G_OBJECT (stream),
466                                           callback,
467                                           user_data,
468                                           g_buffered_input_stream_fill_async);
469       g_simple_async_result_complete_in_idle (simple);
470       g_object_unref (simple);
471       return;
472     }
473
474   if (count < -1)
475     {
476       g_simple_async_report_error_in_idle (G_OBJECT (stream),
477                                            callback,
478                                            user_data,
479                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
480                                            _("Too large count value passed to %s"),
481                                            G_STRFUNC);
482       return;
483     }
484
485   if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
486     {
487       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
488                                             callback,
489                                             user_data,
490                                             error);
491       return;
492     }
493
494   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
495
496   stream->priv->outstanding_callback = callback;
497   g_object_ref (stream);
498   class->fill_async (stream, count, io_priority, cancellable,
499                      async_fill_callback_wrapper, user_data);
500 }
501
502 /**
503  * g_buffered_input_stream_fill_finish:
504  * @stream: a #GBufferedInputStream
505  * @result: a #GAsyncResult
506  * @error: a #GError
507  *
508  * Finishes an asynchronous read.
509  *
510  * Returns: a #gssize of the read stream, or %-1 on an error.
511  */
512 gssize
513 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
514                                      GAsyncResult          *result,
515                                      GError               **error)
516 {
517   GSimpleAsyncResult *simple;
518   GBufferedInputStreamClass *class;
519
520   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
521   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
522
523   if (G_IS_SIMPLE_ASYNC_RESULT (result))
524     {
525       simple = G_SIMPLE_ASYNC_RESULT (result);
526       if (g_simple_async_result_propagate_error (simple, error))
527         return -1;
528
529       /* Special case read of 0 bytes */
530       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
531         return 0;
532     }
533
534   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
535   return class->fill_finish (stream, result, error);
536 }
537
538 /**
539  * g_buffered_input_stream_get_available:
540  * @stream: #GBufferedInputStream
541  *
542  * Gets the size of the available data within the stream.
543  *
544  * Returns: size of the available stream.
545  */
546 gsize
547 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
548 {
549   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
550
551   return stream->priv->end - stream->priv->pos;
552 }
553
554 /**
555  * g_buffered_input_stream_peek:
556  * @stream: a #GBufferedInputStream
557  * @buffer: (array length=count) (element-type guint8): a pointer to
558  *   an allocated chunk of memory
559  * @offset: a #gsize
560  * @count: a #gsize
561  *
562  * Peeks in the buffer, copying data of size @count into @buffer,
563  * offset @offset bytes.
564  *
565  * Returns: a #gsize of the number of bytes peeked, or -1 on error.
566  */
567 gsize
568 g_buffered_input_stream_peek (GBufferedInputStream *stream,
569                               void                 *buffer,
570                               gsize                 offset,
571                               gsize                 count)
572 {
573   gsize available;
574   gsize end;
575
576   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
577   g_return_val_if_fail (buffer != NULL, -1);
578
579   available = g_buffered_input_stream_get_available (stream);
580
581   if (offset > available)
582     return 0;
583
584   end = MIN (offset + count, available);
585   count = end - offset;
586
587   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
588   return count;
589 }
590
591 /**
592  * g_buffered_input_stream_peek_buffer:
593  * @stream: a #GBufferedInputStream
594  * @count: (out): a #gsize to get the number of bytes available in the buffer
595  *
596  * Returns the buffer with the currently available bytes. The returned
597  * buffer must not be modified and will become invalid when reading from
598  * the stream or filling the buffer.
599  *
600  * Returns: (array length=count) (element-type guint8) (transfer none):
601  *          read-only buffer
602  */
603 const void*
604 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
605                                      gsize                *count)
606 {
607   GBufferedInputStreamPrivate *priv;
608
609   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
610
611   priv = stream->priv;
612
613   if (count)
614     *count = priv->end - priv->pos;
615
616   return priv->buffer + priv->pos;
617 }
618
619 static void
620 compact_buffer (GBufferedInputStream *stream)
621 {
622   GBufferedInputStreamPrivate *priv;
623   gsize current_size;
624
625   priv = stream->priv;
626
627   current_size = priv->end - priv->pos;
628
629   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
630
631   priv->pos = 0;
632   priv->end = current_size;
633 }
634
635 static gssize
636 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
637                                    gssize                 count,
638                                    GCancellable          *cancellable,
639                                    GError               **error)
640 {
641   GBufferedInputStreamPrivate *priv;
642   GInputStream *base_stream;
643   gssize nread;
644   gsize in_buffer;
645
646   priv = stream->priv;
647
648   if (count == -1)
649     count = priv->len;
650
651   in_buffer = priv->end - priv->pos;
652
653   /* Never fill more than can fit in the buffer */
654   count = MIN (count, priv->len - in_buffer);
655
656   /* If requested length does not fit at end, compact */
657   if (priv->len - priv->end < count)
658     compact_buffer (stream);
659
660   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
661   nread = g_input_stream_read (base_stream,
662                                priv->buffer + priv->end,
663                                count,
664                                cancellable,
665                                error);
666
667   if (nread > 0)
668     priv->end += nread;
669
670   return nread;
671 }
672
673 static gssize
674 g_buffered_input_stream_skip (GInputStream  *stream,
675                               gsize          count,
676                               GCancellable  *cancellable,
677                               GError       **error)
678 {
679   GBufferedInputStream        *bstream;
680   GBufferedInputStreamPrivate *priv;
681   GBufferedInputStreamClass *class;
682   GInputStream *base_stream;
683   gsize available, bytes_skipped;
684   gssize nread;
685
686   bstream = G_BUFFERED_INPUT_STREAM (stream);
687   priv = bstream->priv;
688
689   available = priv->end - priv->pos;
690
691   if (count <= available)
692     {
693       priv->pos += count;
694       return count;
695     }
696
697   /* Full request not available, skip all currently available and
698    * request refill for more
699    */
700
701   priv->pos = 0;
702   priv->end = 0;
703   bytes_skipped = available;
704   count -= available;
705
706   if (bytes_skipped > 0)
707     error = NULL; /* Ignore further errors if we already read some data */
708
709   if (count > priv->len)
710     {
711       /* Large request, shortcut buffer */
712
713       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
714
715       nread = g_input_stream_skip (base_stream,
716                                    count,
717                                    cancellable,
718                                    error);
719
720       if (nread < 0 && bytes_skipped == 0)
721         return -1;
722
723       if (nread > 0)
724         bytes_skipped += nread;
725
726       return bytes_skipped;
727     }
728
729   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
730   nread = class->fill (bstream, priv->len, cancellable, error);
731
732   if (nread < 0)
733     {
734       if (bytes_skipped == 0)
735         return -1;
736       else
737         return bytes_skipped;
738     }
739
740   available = priv->end - priv->pos;
741   count = MIN (count, available);
742
743   bytes_skipped += count;
744   priv->pos += count;
745
746   return bytes_skipped;
747 }
748
749 static gssize
750 g_buffered_input_stream_read (GInputStream *stream,
751                               void         *buffer,
752                               gsize         count,
753                               GCancellable *cancellable,
754                               GError      **error)
755 {
756   GBufferedInputStream        *bstream;
757   GBufferedInputStreamPrivate *priv;
758   GBufferedInputStreamClass *class;
759   GInputStream *base_stream;
760   gsize available, bytes_read;
761   gssize nread;
762
763   bstream = G_BUFFERED_INPUT_STREAM (stream);
764   priv = bstream->priv;
765
766   available = priv->end - priv->pos;
767
768   if (count <= available)
769     {
770       memcpy (buffer, priv->buffer + priv->pos, count);
771       priv->pos += count;
772       return count;
773     }
774
775   /* Full request not available, read all currently available and
776    * request refill for more
777    */
778
779   memcpy (buffer, priv->buffer + priv->pos, available);
780   priv->pos = 0;
781   priv->end = 0;
782   bytes_read = available;
783   count -= available;
784
785   if (bytes_read > 0)
786     error = NULL; /* Ignore further errors if we already read some data */
787
788   if (count > priv->len)
789     {
790       /* Large request, shortcut buffer */
791
792       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
793
794       nread = g_input_stream_read (base_stream,
795                                    (char *)buffer + bytes_read,
796                                    count,
797                                    cancellable,
798                                    error);
799
800       if (nread < 0 && bytes_read == 0)
801         return -1;
802
803       if (nread > 0)
804         bytes_read += nread;
805
806       return bytes_read;
807     }
808
809   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
810   nread = class->fill (bstream, priv->len, cancellable, error);
811   if (nread < 0)
812     {
813       if (bytes_read == 0)
814         return -1;
815       else
816         return bytes_read;
817     }
818
819   available = priv->end - priv->pos;
820   count = MIN (count, available);
821
822   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
823   bytes_read += count;
824   priv->pos += count;
825
826   return bytes_read;
827 }
828
829 /**
830  * g_buffered_input_stream_read_byte:
831  * @stream: a #GBufferedInputStream
832  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore
833  * @error: location to store the error occurring, or %NULL to ignore
834  *
835  * Tries to read a single byte from the stream or the buffer. Will block
836  * during this read.
837  *
838  * On success, the byte read from the stream is returned. On end of stream
839  * -1 is returned but it's not an exceptional error and @error is not set.
840  *
841  * If @cancellable is not %NULL, then the operation can be cancelled by
842  * triggering the cancellable object from another thread. If the operation
843  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
844  * operation was partially finished when the operation was cancelled the
845  * partial result will be returned, without an error.
846  *
847  * On error -1 is returned and @error is set accordingly.
848  *
849  * Returns: the byte read from the @stream, or -1 on end of stream or error.
850  */
851 int
852 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
853                                    GCancellable          *cancellable,
854                                    GError               **error)
855 {
856   GBufferedInputStreamPrivate *priv;
857   GBufferedInputStreamClass *class;
858   GInputStream *input_stream;
859   gsize available;
860   gssize nread;
861
862   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
863
864   priv = stream->priv;
865   input_stream = G_INPUT_STREAM (stream);
866
867   if (g_input_stream_is_closed (input_stream))
868     {
869       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
870                            _("Stream is already closed"));
871       return -1;
872     }
873
874   if (!g_input_stream_set_pending (input_stream, error))
875     return -1;
876
877   available = priv->end - priv->pos;
878
879   if (available != 0)
880     {
881       g_input_stream_clear_pending (input_stream);
882       return priv->buffer[priv->pos++];
883     }
884
885   /* Byte not available, request refill for more */
886
887   if (cancellable)
888     g_cancellable_push_current (cancellable);
889
890   priv->pos = 0;
891   priv->end = 0;
892
893   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
894   nread = class->fill (stream, priv->len, cancellable, error);
895
896   if (cancellable)
897     g_cancellable_pop_current (cancellable);
898
899   g_input_stream_clear_pending (input_stream);
900
901   if (nread <= 0)
902     return -1; /* error or end of stream */
903
904   return priv->buffer[priv->pos++];
905 }
906
907 /* ************************** */
908 /* Async stuff implementation */
909 /* ************************** */
910
911 static void
912 fill_async_callback (GObject      *source_object,
913                      GAsyncResult *result,
914                      gpointer      user_data)
915 {
916   GError *error;
917   gssize res;
918   GSimpleAsyncResult *simple;
919
920   simple = user_data;
921
922   error = NULL;
923   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
924                                     result, &error);
925
926   g_simple_async_result_set_op_res_gssize (simple, res);
927   if (res == -1)
928     {
929       g_simple_async_result_take_error (simple, error);
930     }
931   else
932     {
933       GBufferedInputStreamPrivate *priv;
934       GObject *object;
935
936       object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
937       priv = G_BUFFERED_INPUT_STREAM (object)->priv;
938
939       g_assert_cmpint (priv->end + res, <=, priv->len);
940       priv->end += res;
941
942       g_object_unref (object);
943     }
944
945   /* Complete immediately, not in idle, since we're already
946    * in a mainloop callout
947    */
948   g_simple_async_result_complete (simple);
949   g_object_unref (simple);
950 }
951
952 static void
953 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
954                                          gssize                count,
955                                          int                   io_priority,
956                                          GCancellable         *cancellable,
957                                          GAsyncReadyCallback   callback,
958                                          gpointer              user_data)
959 {
960   GBufferedInputStreamPrivate *priv;
961   GInputStream *base_stream;
962   GSimpleAsyncResult *simple;
963   gsize in_buffer;
964
965   priv = stream->priv;
966
967   if (count == -1)
968     count = priv->len;
969
970   in_buffer = priv->end - priv->pos;
971
972   /* Never fill more than can fit in the buffer */
973   count = MIN (count, priv->len - in_buffer);
974
975   /* If requested length does not fit at end, compact */
976   if (priv->len - priv->end < count)
977     compact_buffer (stream);
978
979   simple = g_simple_async_result_new (G_OBJECT (stream),
980                                       callback, user_data,
981                                       g_buffered_input_stream_real_fill_async);
982
983   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
984   g_input_stream_read_async (base_stream,
985                              priv->buffer + priv->end,
986                              count,
987                              io_priority,
988                              cancellable,
989                              fill_async_callback,
990                              simple);
991 }
992
993 static gssize
994 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
995                                           GAsyncResult         *result,
996                                           GError              **error)
997 {
998   GSimpleAsyncResult *simple;
999   gssize nread;
1000
1001   simple = G_SIMPLE_ASYNC_RESULT (result);
1002   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1003
1004   nread = g_simple_async_result_get_op_res_gssize (simple);
1005   return nread;
1006 }
1007
1008 typedef struct
1009 {
1010   gssize bytes_skipped;
1011   gssize count;
1012 } SkipAsyncData;
1013
1014 static void
1015 free_skip_async_data (gpointer _data)
1016 {
1017   SkipAsyncData *data = _data;
1018   g_slice_free (SkipAsyncData, data);
1019 }
1020
1021 static void
1022 large_skip_callback (GObject      *source_object,
1023                      GAsyncResult *result,
1024                      gpointer      user_data)
1025 {
1026   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1027   SkipAsyncData *data;
1028   GError *error;
1029   gssize nread;
1030
1031   data = g_simple_async_result_get_op_res_gpointer (simple);
1032
1033   error = NULL;
1034   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1035                                       result, &error);
1036
1037   /* Only report the error if we've not already read some data */
1038   if (nread < 0 && data->bytes_skipped == 0)
1039     g_simple_async_result_take_error (simple, error);
1040   else if (error)
1041     g_error_free (error);
1042
1043   if (nread > 0)
1044     data->bytes_skipped += nread;
1045
1046   /* Complete immediately, not in idle, since we're already
1047    * in a mainloop callout
1048    */
1049   g_simple_async_result_complete (simple);
1050   g_object_unref (simple);
1051 }
1052
1053 static void
1054 skip_fill_buffer_callback (GObject      *source_object,
1055                            GAsyncResult *result,
1056                            gpointer      user_data)
1057 {
1058   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1059   GBufferedInputStream *bstream;
1060   GBufferedInputStreamPrivate *priv;
1061   SkipAsyncData *data;
1062   GError *error;
1063   gssize nread;
1064   gsize available;
1065
1066   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1067   priv = bstream->priv;
1068
1069   data = g_simple_async_result_get_op_res_gpointer (simple);
1070
1071   error = NULL;
1072   nread = g_buffered_input_stream_fill_finish (bstream,
1073                                                result, &error);
1074
1075   if (nread < 0 && data->bytes_skipped == 0)
1076     g_simple_async_result_take_error (simple, error);
1077   else if (error)
1078     g_error_free (error);
1079
1080   if (nread > 0)
1081     {
1082       available = priv->end - priv->pos;
1083       data->count = MIN (data->count, available);
1084
1085       data->bytes_skipped += data->count;
1086       priv->pos += data->count;
1087     }
1088
1089   /* Complete immediately, not in idle, since we're already
1090    * in a mainloop callout
1091    */
1092   g_simple_async_result_complete (simple);
1093   g_object_unref (simple);
1094 }
1095
1096 static void
1097 g_buffered_input_stream_skip_async (GInputStream        *stream,
1098                                     gsize                count,
1099                                     int                  io_priority,
1100                                     GCancellable        *cancellable,
1101                                     GAsyncReadyCallback  callback,
1102                                     gpointer             user_data)
1103 {
1104   GBufferedInputStream *bstream;
1105   GBufferedInputStreamPrivate *priv;
1106   GBufferedInputStreamClass *class;
1107   GInputStream *base_stream;
1108   gsize available;
1109   GSimpleAsyncResult *simple;
1110   SkipAsyncData *data;
1111
1112   bstream = G_BUFFERED_INPUT_STREAM (stream);
1113   priv = bstream->priv;
1114
1115   data = g_slice_new (SkipAsyncData);
1116   data->bytes_skipped = 0;
1117   simple = g_simple_async_result_new (G_OBJECT (stream),
1118                                       callback, user_data,
1119                                       g_buffered_input_stream_skip_async);
1120   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1121
1122   available = priv->end - priv->pos;
1123
1124   if (count <= available)
1125     {
1126       priv->pos += count;
1127       data->bytes_skipped = count;
1128
1129       g_simple_async_result_complete_in_idle (simple);
1130       g_object_unref (simple);
1131       return;
1132     }
1133
1134   /* Full request not available, skip all currently available
1135    * and request refill for more
1136    */
1137
1138   priv->pos = 0;
1139   priv->end = 0;
1140
1141   count -= available;
1142
1143   data->bytes_skipped = available;
1144   data->count = count;
1145
1146   if (count > priv->len)
1147     {
1148       /* Large request, shortcut buffer */
1149
1150       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1151
1152       g_input_stream_skip_async (base_stream,
1153                                  count,
1154                                  io_priority, cancellable,
1155                                  large_skip_callback,
1156                                  simple);
1157     }
1158   else
1159     {
1160       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1161       class->fill_async (bstream, priv->len, io_priority, cancellable,
1162                          skip_fill_buffer_callback, simple);
1163     }
1164 }
1165
1166 static gssize
1167 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1168                                      GAsyncResult   *result,
1169                                      GError        **error)
1170 {
1171   GSimpleAsyncResult *simple;
1172   SkipAsyncData *data;
1173
1174   simple = G_SIMPLE_ASYNC_RESULT (result);
1175
1176   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1177
1178   data = g_simple_async_result_get_op_res_gpointer (simple);
1179
1180   return data->bytes_skipped;
1181 }