Use g_simple_async_result_{new_,}take_error
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  *
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org>
22  */
23
24 #include "config.h"
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gcancellable.h"
28 #include "gasyncresult.h"
29 #include "gsimpleasyncresult.h"
30 #include "gioerror.h"
31 #include <string.h>
32 #include "glibintl.h"
33
34
35 /**
36  * SECTION:gbufferedinputstream
37  * @short_description: Buffered Input Stream
38  * @include: gio/gio.h
39  * @see_also: #GFilterInputStream, #GInputStream
40  *
41  * Buffered input stream implements #GFilterInputStream and provides
42  * for buffered reads.
43  *
44  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
45  *
46  * To create a buffered input stream, use g_buffered_input_stream_new(),
47  * or g_buffered_input_stream_new_sized() to specify the buffer's size at
48  * construction.
49  *
50  * To get the size of a buffer within a buffered input stream, use
51  * g_buffered_input_stream_get_buffer_size(). To change the size of a
52  * buffered input stream's buffer, use
53  * g_buffered_input_stream_set_buffer_size(). Note that the buffer's size
54  * cannot be reduced below the size of the data within the buffer.
55  */
56
57
58 #define DEFAULT_BUFFER_SIZE 4096
59
60 struct _GBufferedInputStreamPrivate {
61   guint8 *buffer;
62   gsize   len;
63   gsize   pos;
64   gsize   end;
65   GAsyncReadyCallback outstanding_callback;
66 };
67
68 enum {
69   PROP_0,
70   PROP_BUFSIZE
71 };
72
73 static void g_buffered_input_stream_set_property  (GObject      *object,
74                                                    guint         prop_id,
75                                                    const GValue *value,
76                                                    GParamSpec   *pspec);
77
78 static void g_buffered_input_stream_get_property  (GObject      *object,
79                                                    guint         prop_id,
80                                                    GValue       *value,
81                                                    GParamSpec   *pspec);
82 static void g_buffered_input_stream_finalize      (GObject *object);
83
84
85 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
86                                                         gsize                  count,
87                                                         GCancellable          *cancellable,
88                                                         GError               **error);
89 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
90                                                         gsize                  count,
91                                                         int                    io_priority,
92                                                         GCancellable          *cancellable,
93                                                         GAsyncReadyCallback    callback,
94                                                         gpointer               user_data);
95 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
96                                                         GAsyncResult          *result,
97                                                         GError               **error);
98 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
99                                                         void                  *buffer,
100                                                         gsize                  count,
101                                                         GCancellable          *cancellable,
102                                                         GError               **error);
103 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
104                                                         void                  *buffer,
105                                                         gsize                  count,
106                                                         int                    io_priority,
107                                                         GCancellable          *cancellable,
108                                                         GAsyncReadyCallback    callback,
109                                                         gpointer               user_data);
110 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
111                                                         GAsyncResult          *result,
112                                                         GError               **error);
113 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
114                                                         gssize                 count,
115                                                         GCancellable          *cancellable,
116                                                         GError               **error);
117 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
118                                                         gssize                 count,
119                                                         int                    io_priority,
120                                                         GCancellable          *cancellable,
121                                                         GAsyncReadyCallback    callback,
122                                                         gpointer               user_data);
123 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
124                                                         GAsyncResult          *result,
125                                                         GError               **error);
126
127 static void compact_buffer (GBufferedInputStream *stream);
128
129 G_DEFINE_TYPE (GBufferedInputStream,
130                g_buffered_input_stream,
131                G_TYPE_FILTER_INPUT_STREAM)
132
133
134 static void
135 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
136 {
137   GObjectClass *object_class;
138   GInputStreamClass *istream_class;
139   GBufferedInputStreamClass *bstream_class;
140
141   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
142
143   object_class = G_OBJECT_CLASS (klass);
144   object_class->get_property = g_buffered_input_stream_get_property;
145   object_class->set_property = g_buffered_input_stream_set_property;
146   object_class->finalize     = g_buffered_input_stream_finalize;
147
148   istream_class = G_INPUT_STREAM_CLASS (klass);
149   istream_class->skip = g_buffered_input_stream_skip;
150   istream_class->skip_async  = g_buffered_input_stream_skip_async;
151   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
152   istream_class->read_fn = g_buffered_input_stream_read;
153   istream_class->read_async  = g_buffered_input_stream_read_async;
154   istream_class->read_finish = g_buffered_input_stream_read_finish;
155
156   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
157   bstream_class->fill = g_buffered_input_stream_real_fill;
158   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
159   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
160
161   g_object_class_install_property (object_class,
162                                    PROP_BUFSIZE,
163                                    g_param_spec_uint ("buffer-size",
164                                                       P_("Buffer Size"),
165                                                       P_("The size of the backend buffer"),
166                                                       1,
167                                                       G_MAXUINT,
168                                                       DEFAULT_BUFFER_SIZE,
169                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
170                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
171
172
173 }
174
175 /**
176  * g_buffered_input_stream_get_buffer_size:
177  * @stream: a #GBufferedInputStream
178  *
179  * Gets the size of the input buffer.
180  *
181  * Returns: the current buffer size.
182  */
183 gsize
184 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
185 {
186   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
187
188   return stream->priv->len;
189 }
190
191 /**
192  * g_buffered_input_stream_set_buffer_size:
193  * @stream: a #GBufferedInputStream
194  * @size: a #gsize
195  *
196  * Sets the size of the internal buffer of @stream to @size, or to the
197  * size of the contents of the buffer. The buffer can never be resized
198  * smaller than its current contents.
199  */
200 void
201 g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream,
202                                          gsize                 size)
203 {
204   GBufferedInputStreamPrivate *priv;
205   gsize in_buffer;
206   guint8 *buffer;
207
208   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
209
210   priv = stream->priv;
211
212   if (priv->len == size)
213     return;
214
215   if (priv->buffer)
216     {
217       in_buffer = priv->end - priv->pos;
218
219       /* Never resize smaller than current buffer contents */
220       size = MAX (size, in_buffer);
221
222       buffer = g_malloc (size);
223       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
224       priv->len = size;
225       priv->pos = 0;
226       priv->end = in_buffer;
227       g_free (priv->buffer);
228       priv->buffer = buffer;
229     }
230   else
231     {
232       priv->len = size;
233       priv->pos = 0;
234       priv->end = 0;
235       priv->buffer = g_malloc (size);
236     }
237
238   g_object_notify (G_OBJECT (stream), "buffer-size");
239 }
240
241 static void
242 g_buffered_input_stream_set_property (GObject      *object,
243                                       guint         prop_id,
244                                       const GValue *value,
245                                       GParamSpec   *pspec)
246 {
247   GBufferedInputStream        *bstream;
248
249   bstream = G_BUFFERED_INPUT_STREAM (object);
250
251   switch (prop_id)
252     {
253     case PROP_BUFSIZE:
254       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
255       break;
256
257     default:
258       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
259       break;
260     }
261 }
262
263 static void
264 g_buffered_input_stream_get_property (GObject    *object,
265                                       guint       prop_id,
266                                       GValue     *value,
267                                       GParamSpec *pspec)
268 {
269   GBufferedInputStreamPrivate *priv;
270   GBufferedInputStream        *bstream;
271
272   bstream = G_BUFFERED_INPUT_STREAM (object);
273   priv = bstream->priv;
274
275   switch (prop_id)
276     {
277     case PROP_BUFSIZE:
278       g_value_set_uint (value, priv->len);
279       break;
280
281     default:
282       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
283       break;
284     }
285 }
286
287 static void
288 g_buffered_input_stream_finalize (GObject *object)
289 {
290   GBufferedInputStreamPrivate *priv;
291   GBufferedInputStream        *stream;
292
293   stream = G_BUFFERED_INPUT_STREAM (object);
294   priv = stream->priv;
295
296   g_free (priv->buffer);
297
298   G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
299 }
300
301 static void
302 g_buffered_input_stream_init (GBufferedInputStream *stream)
303 {
304   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
305                                               G_TYPE_BUFFERED_INPUT_STREAM,
306                                               GBufferedInputStreamPrivate);
307 }
308
309
310 /**
311  * g_buffered_input_stream_new:
312  * @base_stream: a #GInputStream
313  *
314  * Creates a new #GInputStream from the given @base_stream, with
315  * a buffer set to the default size (4 kilobytes).
316  *
317  * Returns: a #GInputStream for the given @base_stream.
318  */
319 GInputStream *
320 g_buffered_input_stream_new (GInputStream *base_stream)
321 {
322   GInputStream *stream;
323
324   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
325
326   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
327                          "base-stream", base_stream,
328                          NULL);
329
330   return stream;
331 }
332
333 /**
334  * g_buffered_input_stream_new_sized:
335  * @base_stream: a #GInputStream
336  * @size: a #gsize
337  *
338  * Creates a new #GBufferedInputStream from the given @base_stream,
339  * with a buffer set to @size.
340  *
341  * Returns: a #GInputStream.
342  */
343 GInputStream *
344 g_buffered_input_stream_new_sized (GInputStream *base_stream,
345                                    gsize         size)
346 {
347   GInputStream *stream;
348
349   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
350
351   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
352                          "base-stream", base_stream,
353                          "buffer-size", (guint)size,
354                          NULL);
355
356   return stream;
357 }
358
359 /**
360  * g_buffered_input_stream_fill:
361  * @stream: a #GBufferedInputStream
362  * @count: the number of bytes that will be read from the stream
363  * @cancellable: optional #GCancellable object, %NULL to ignore
364  * @error: location to store the error occuring, or %NULL to ignore
365  *
366  * Tries to read @count bytes from the stream into the buffer.
367  * Will block during this read.
368  *
369  * If @count is zero, returns zero and does nothing. A value of @count
370  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
371  *
372  * On success, the number of bytes read into the buffer is returned.
373  * It is not an error if this is not the same as the requested size, as it
374  * can happen e.g. near the end of a file. Zero is returned on end of file
375  * (or if @count is zero),  but never otherwise.
376  *
377  * If @count is -1 then the attempted read size is equal to the number of
378  * bytes that are required to fill the buffer.
379  *
380  * If @cancellable is not %NULL, then the operation can be cancelled by
381  * triggering the cancellable object from another thread. If the operation
382  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
383  * operation was partially finished when the operation was cancelled the
384  * partial result will be returned, without an error.
385  *
386  * On error -1 is returned and @error is set accordingly.
387  *
388  * For the asynchronous, non-blocking, version of this function, see
389  * g_buffered_input_stream_fill_async().
390  *
391  * Returns: the number of bytes read into @stream's buffer, up to @count,
392  *     or -1 on error.
393  */
394 gssize
395 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
396                               gssize                 count,
397                               GCancellable          *cancellable,
398                               GError               **error)
399 {
400   GBufferedInputStreamClass *class;
401   GInputStream *input_stream;
402   gssize res;
403
404   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
405
406   input_stream = G_INPUT_STREAM (stream);
407
408   if (count < -1)
409     {
410       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
411                    _("Too large count value passed to %s"), G_STRFUNC);
412       return -1;
413     }
414
415   if (!g_input_stream_set_pending (input_stream, error))
416     return -1;
417
418   if (cancellable)
419     g_cancellable_push_current (cancellable);
420
421   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
422   res = class->fill (stream, count, cancellable, error);
423
424   if (cancellable)
425     g_cancellable_pop_current (cancellable);
426
427   g_input_stream_clear_pending (input_stream);
428
429   return res;
430 }
431
432 static void
433 async_fill_callback_wrapper (GObject      *source_object,
434                              GAsyncResult *res,
435                              gpointer      user_data)
436 {
437   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
438
439   g_input_stream_clear_pending (G_INPUT_STREAM (stream));
440   (*stream->priv->outstanding_callback) (source_object, res, user_data);
441   g_object_unref (stream);
442 }
443
444 /**
445  * g_buffered_input_stream_fill_async:
446  * @stream: a #GBufferedInputStream
447  * @count: the number of bytes that will be read from the stream
448  * @io_priority: the <link linkend="io-priority">I/O priority</link>
449  *     of the request
450  * @cancellable: optional #GCancellable object
451  * @callback: a #GAsyncReadyCallback
452  * @user_data: a #gpointer
453  *
454  * Reads data into @stream's buffer asynchronously, up to @count size.
455  * @io_priority can be used to prioritize reads. For the synchronous
456  * version of this function, see g_buffered_input_stream_fill().
457  *
458  * If @count is -1 then the attempted read size is equal to the number
459  * of bytes that are required to fill the buffer.
460  */
461 void
462 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
463                                     gssize                count,
464                                     int                   io_priority,
465                                     GCancellable         *cancellable,
466                                     GAsyncReadyCallback   callback,
467                                     gpointer              user_data)
468 {
469   GBufferedInputStreamClass *class;
470   GSimpleAsyncResult *simple;
471   GError *error = NULL;
472
473   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
474
475   if (count == 0)
476     {
477       simple = g_simple_async_result_new (G_OBJECT (stream),
478                                           callback,
479                                           user_data,
480                                           g_buffered_input_stream_fill_async);
481       g_simple_async_result_complete_in_idle (simple);
482       g_object_unref (simple);
483       return;
484     }
485
486   if (count < -1)
487     {
488       g_simple_async_report_error_in_idle (G_OBJECT (stream),
489                                            callback,
490                                            user_data,
491                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
492                                            _("Too large count value passed to %s"),
493                                            G_STRFUNC);
494       return;
495     }
496
497   if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
498     {
499       g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
500                                             callback,
501                                             user_data,
502                                             error);
503       g_error_free (error);
504       return;
505     }
506
507   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
508
509   stream->priv->outstanding_callback = callback;
510   g_object_ref (stream);
511   class->fill_async (stream, count, io_priority, cancellable,
512                      async_fill_callback_wrapper, user_data);
513 }
514
515 /**
516  * g_buffered_input_stream_fill_finish:
517  * @stream: a #GBufferedInputStream
518  * @result: a #GAsyncResult
519  * @error: a #GError
520  *
521  * Finishes an asynchronous read.
522  *
523  * Returns: a #gssize of the read stream, or %-1 on an error.
524  */
525 gssize
526 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
527                                      GAsyncResult          *result,
528                                      GError               **error)
529 {
530   GSimpleAsyncResult *simple;
531   GBufferedInputStreamClass *class;
532
533   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
534   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
535
536   if (G_IS_SIMPLE_ASYNC_RESULT (result))
537     {
538       simple = G_SIMPLE_ASYNC_RESULT (result);
539       if (g_simple_async_result_propagate_error (simple, error))
540         return -1;
541
542       /* Special case read of 0 bytes */
543       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
544         return 0;
545     }
546
547   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
548   return class->fill_finish (stream, result, error);
549 }
550
551 /**
552  * g_buffered_input_stream_get_available:
553  * @stream: #GBufferedInputStream
554  *
555  * Gets the size of the available data within the stream.
556  *
557  * Returns: size of the available stream.
558  */
559 gsize
560 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
561 {
562   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
563
564   return stream->priv->end - stream->priv->pos;
565 }
566
567 /**
568  * g_buffered_input_stream_peek:
569  * @stream: a #GBufferedInputStream
570  * @buffer: a pointer to an allocated chunk of memory
571  * @offset: a #gsize
572  * @count: a #gsize
573  *
574  * Peeks in the buffer, copying data of size @count into @buffer,
575  * offset @offset bytes.
576  *
577  * Returns: a #gsize of the number of bytes peeked, or -1 on error.
578  */
579 gsize
580 g_buffered_input_stream_peek (GBufferedInputStream *stream,
581                               void                 *buffer,
582                               gsize                 offset,
583                               gsize                 count)
584 {
585   gsize available;
586   gsize end;
587
588   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
589   g_return_val_if_fail (buffer != NULL, -1);
590
591   available = g_buffered_input_stream_get_available (stream);
592
593   if (offset > available)
594     return 0;
595
596   end = MIN (offset + count, available);
597   count = end - offset;
598
599   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
600   return count;
601 }
602
603 /**
604  * g_buffered_input_stream_peek_buffer:
605  * @stream: a #GBufferedInputStream
606  * @count: a #gsize to get the number of bytes available in the buffer
607  *
608  * Returns the buffer with the currently available bytes. The returned
609  * buffer must not be modified and will become invalid when reading from
610  * the stream or filling the buffer.
611  *
612  * Returns: read-only buffer
613  */
614 const void*
615 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
616                                      gsize                *count)
617 {
618   GBufferedInputStreamPrivate *priv;
619
620   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
621
622   priv = stream->priv;
623
624   if (count)
625     *count = priv->end - priv->pos;
626
627   return priv->buffer + priv->pos;
628 }
629
630 static void
631 compact_buffer (GBufferedInputStream *stream)
632 {
633   GBufferedInputStreamPrivate *priv;
634   gsize current_size;
635
636   priv = stream->priv;
637
638   current_size = priv->end - priv->pos;
639
640   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
641
642   priv->pos = 0;
643   priv->end = current_size;
644 }
645
646 static gssize
647 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
648                                    gssize                 count,
649                                    GCancellable          *cancellable,
650                                    GError               **error)
651 {
652   GBufferedInputStreamPrivate *priv;
653   GInputStream *base_stream;
654   gssize nread;
655   gsize in_buffer;
656
657   priv = stream->priv;
658
659   if (count == -1)
660     count = priv->len;
661
662   in_buffer = priv->end - priv->pos;
663
664   /* Never fill more than can fit in the buffer */
665   count = MIN (count, priv->len - in_buffer);
666
667   /* If requested length does not fit at end, compact */
668   if (priv->len - priv->end < count)
669     compact_buffer (stream);
670
671   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
672   nread = g_input_stream_read (base_stream,
673                                priv->buffer + priv->end,
674                                count,
675                                cancellable,
676                                error);
677
678   if (nread > 0)
679     priv->end += nread;
680
681   return nread;
682 }
683
684 static gssize
685 g_buffered_input_stream_skip (GInputStream  *stream,
686                               gsize          count,
687                               GCancellable  *cancellable,
688                               GError       **error)
689 {
690   GBufferedInputStream        *bstream;
691   GBufferedInputStreamPrivate *priv;
692   GBufferedInputStreamClass *class;
693   GInputStream *base_stream;
694   gsize available, bytes_skipped;
695   gssize nread;
696
697   bstream = G_BUFFERED_INPUT_STREAM (stream);
698   priv = bstream->priv;
699
700   available = priv->end - priv->pos;
701
702   if (count <= available)
703     {
704       priv->pos += count;
705       return count;
706     }
707
708   /* Full request not available, skip all currently available and
709    * request refill for more
710    */
711
712   priv->pos = 0;
713   priv->end = 0;
714   bytes_skipped = available;
715   count -= available;
716
717   if (bytes_skipped > 0)
718     error = NULL; /* Ignore further errors if we already read some data */
719
720   if (count > priv->len)
721     {
722       /* Large request, shortcut buffer */
723
724       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
725
726       nread = g_input_stream_skip (base_stream,
727                                    count,
728                                    cancellable,
729                                    error);
730
731       if (nread < 0 && bytes_skipped == 0)
732         return -1;
733
734       if (nread > 0)
735         bytes_skipped += nread;
736
737       return bytes_skipped;
738     }
739
740   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
741   nread = class->fill (bstream, priv->len, cancellable, error);
742
743   if (nread < 0)
744     {
745       if (bytes_skipped == 0)
746         return -1;
747       else
748         return bytes_skipped;
749     }
750
751   available = priv->end - priv->pos;
752   count = MIN (count, available);
753
754   bytes_skipped += count;
755   priv->pos += count;
756
757   return bytes_skipped;
758 }
759
760 static gssize
761 g_buffered_input_stream_read (GInputStream *stream,
762                               void         *buffer,
763                               gsize         count,
764                               GCancellable *cancellable,
765                               GError      **error)
766 {
767   GBufferedInputStream        *bstream;
768   GBufferedInputStreamPrivate *priv;
769   GBufferedInputStreamClass *class;
770   GInputStream *base_stream;
771   gsize available, bytes_read;
772   gssize nread;
773
774   bstream = G_BUFFERED_INPUT_STREAM (stream);
775   priv = bstream->priv;
776
777   available = priv->end - priv->pos;
778
779   if (count <= available)
780     {
781       memcpy (buffer, priv->buffer + priv->pos, count);
782       priv->pos += count;
783       return count;
784     }
785
786   /* Full request not available, read all currently available and
787    * request refill for more
788    */
789
790   memcpy (buffer, priv->buffer + priv->pos, available);
791   priv->pos = 0;
792   priv->end = 0;
793   bytes_read = available;
794   count -= available;
795
796   if (bytes_read > 0)
797     error = NULL; /* Ignore further errors if we already read some data */
798
799   if (count > priv->len)
800     {
801       /* Large request, shortcut buffer */
802
803       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
804
805       nread = g_input_stream_read (base_stream,
806                                    (char *)buffer + bytes_read,
807                                    count,
808                                    cancellable,
809                                    error);
810
811       if (nread < 0 && bytes_read == 0)
812         return -1;
813
814       if (nread > 0)
815         bytes_read += nread;
816
817       return bytes_read;
818     }
819
820   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
821   nread = class->fill (bstream, priv->len, cancellable, error);
822   if (nread < 0)
823     {
824       if (bytes_read == 0)
825         return -1;
826       else
827         return bytes_read;
828     }
829
830   available = priv->end - priv->pos;
831   count = MIN (count, available);
832
833   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
834   bytes_read += count;
835   priv->pos += count;
836
837   return bytes_read;
838 }
839
840 /**
841  * g_buffered_input_stream_read_byte:
842  * @stream: a #GBufferedInputStream
843  * @cancellable: optional #GCancellable object, %NULL to ignore
844  * @error: location to store the error occuring, or %NULL to ignore
845  *
846  * Tries to read a single byte from the stream or the buffer. Will block
847  * during this read.
848  *
849  * On success, the byte read from the stream is returned. On end of stream
850  * -1 is returned but it's not an exceptional error and @error is not set.
851  *
852  * If @cancellable is not %NULL, then the operation can be cancelled by
853  * triggering the cancellable object from another thread. If the operation
854  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
855  * operation was partially finished when the operation was cancelled the
856  * partial result will be returned, without an error.
857  *
858  * On error -1 is returned and @error is set accordingly.
859  *
860  * Returns: the byte read from the @stream, or -1 on end of stream or error.
861  */
862 int
863 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
864                                    GCancellable          *cancellable,
865                                    GError               **error)
866 {
867   GBufferedInputStreamPrivate *priv;
868   GBufferedInputStreamClass *class;
869   GInputStream *input_stream;
870   gsize available;
871   gssize nread;
872
873   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
874
875   priv = stream->priv;
876   input_stream = G_INPUT_STREAM (stream);
877
878   if (g_input_stream_is_closed (input_stream))
879     {
880       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
881                            _("Stream is already closed"));
882       return -1;
883     }
884
885   if (!g_input_stream_set_pending (input_stream, error))
886     return -1;
887
888   available = priv->end - priv->pos;
889
890   if (available != 0)
891     {
892       g_input_stream_clear_pending (input_stream);
893       return priv->buffer[priv->pos++];
894     }
895
896   /* Byte not available, request refill for more */
897
898   if (cancellable)
899     g_cancellable_push_current (cancellable);
900
901   priv->pos = 0;
902   priv->end = 0;
903
904   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
905   nread = class->fill (stream, priv->len, cancellable, error);
906
907   if (cancellable)
908     g_cancellable_pop_current (cancellable);
909
910   g_input_stream_clear_pending (input_stream);
911
912   if (nread <= 0)
913     return -1; /* error or end of stream */
914
915   return priv->buffer[priv->pos++];
916 }
917
918 /* ************************** */
919 /* Async stuff implementation */
920 /* ************************** */
921
922 static void
923 fill_async_callback (GObject      *source_object,
924                      GAsyncResult *result,
925                      gpointer      user_data)
926 {
927   GError *error;
928   gssize res;
929   GSimpleAsyncResult *simple;
930
931   simple = user_data;
932
933   error = NULL;
934   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
935                                     result, &error);
936
937   g_simple_async_result_set_op_res_gssize (simple, res);
938   if (res == -1)
939     {
940       g_simple_async_result_take_error (simple, error);
941     }
942   else
943     {
944       GBufferedInputStreamPrivate *priv;
945       GObject *object;
946
947       object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
948       priv = G_BUFFERED_INPUT_STREAM (object)->priv;
949
950       g_assert_cmpint (priv->end + res, <=, priv->len);
951       priv->end += res;
952
953       g_object_unref (object);
954     }
955
956   /* Complete immediately, not in idle, since we're already
957    * in a mainloop callout
958    */
959   g_simple_async_result_complete (simple);
960   g_object_unref (simple);
961 }
962
963 static void
964 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
965                                          gssize                count,
966                                          int                   io_priority,
967                                          GCancellable         *cancellable,
968                                          GAsyncReadyCallback   callback,
969                                          gpointer              user_data)
970 {
971   GBufferedInputStreamPrivate *priv;
972   GInputStream *base_stream;
973   GSimpleAsyncResult *simple;
974   gsize in_buffer;
975
976   priv = stream->priv;
977
978   if (count == -1)
979     count = priv->len;
980
981   in_buffer = priv->end - priv->pos;
982
983   /* Never fill more than can fit in the buffer */
984   count = MIN (count, priv->len - in_buffer);
985
986   /* If requested length does not fit at end, compact */
987   if (priv->len - priv->end < count)
988     compact_buffer (stream);
989
990   simple = g_simple_async_result_new (G_OBJECT (stream),
991                                       callback, user_data,
992                                       g_buffered_input_stream_real_fill_async);
993
994   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
995   g_input_stream_read_async (base_stream,
996                              priv->buffer + priv->end,
997                              count,
998                              io_priority,
999                              cancellable,
1000                              fill_async_callback,
1001                              simple);
1002 }
1003
1004 static gssize
1005 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
1006                                           GAsyncResult         *result,
1007                                           GError              **error)
1008 {
1009   GSimpleAsyncResult *simple;
1010   gssize nread;
1011
1012   simple = G_SIMPLE_ASYNC_RESULT (result);
1013   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1014
1015   nread = g_simple_async_result_get_op_res_gssize (simple);
1016   return nread;
1017 }
1018
1019 typedef struct
1020 {
1021   gssize bytes_read;
1022   gssize count;
1023   void *buffer;
1024 } ReadAsyncData;
1025
1026 static void
1027 free_read_async_data (gpointer _data)
1028 {
1029   ReadAsyncData *data = _data;
1030   g_slice_free (ReadAsyncData, data);
1031 }
1032
1033 static void
1034 large_read_callback (GObject *source_object,
1035                      GAsyncResult *result,
1036                      gpointer user_data)
1037 {
1038   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1039   ReadAsyncData *data;
1040   GError *error;
1041   gssize nread;
1042
1043   data = g_simple_async_result_get_op_res_gpointer (simple);
1044
1045   error = NULL;
1046   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1047                                       result, &error);
1048
1049   /* Only report the error if we've not already read some data */
1050   if (nread < 0 && data->bytes_read == 0)
1051     g_simple_async_result_take_error (simple, error);
1052   else if (error)
1053     g_error_free (error);
1054
1055   if (nread > 0)
1056     data->bytes_read += nread;
1057
1058   /* Complete immediately, not in idle, since we're already
1059    * in a mainloop callout
1060    */
1061   g_simple_async_result_complete (simple);
1062   g_object_unref (simple);
1063 }
1064
1065 static void
1066 read_fill_buffer_callback (GObject      *source_object,
1067                            GAsyncResult *result,
1068                            gpointer      user_data)
1069 {
1070   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1071   GBufferedInputStream *bstream;
1072   GBufferedInputStreamPrivate *priv;
1073   ReadAsyncData *data;
1074   GError *error;
1075   gssize nread;
1076   gsize available;
1077
1078   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1079   priv = bstream->priv;
1080
1081   data = g_simple_async_result_get_op_res_gpointer (simple);
1082
1083   error = NULL;
1084   nread = g_buffered_input_stream_fill_finish (bstream,
1085                                                result, &error);
1086
1087   if (nread < 0 && data->bytes_read == 0)
1088     g_simple_async_result_take_error (simple, error);
1089   else if (error)
1090     g_error_free (error);
1091
1092   if (nread > 0)
1093     {
1094       available = priv->end - priv->pos;
1095       data->count = MIN (data->count, available);
1096
1097       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1098       data->bytes_read += data->count;
1099       priv->pos += data->count;
1100     }
1101
1102   /* Complete immediately, not in idle, since we're already
1103    * in a mainloop callout
1104    */
1105   g_simple_async_result_complete (simple);
1106   g_object_unref (simple);
1107 }
1108
1109 static void
1110 g_buffered_input_stream_read_async (GInputStream        *stream,
1111                                     void                *buffer,
1112                                     gsize                count,
1113                                     int                  io_priority,
1114                                     GCancellable        *cancellable,
1115                                     GAsyncReadyCallback  callback,
1116                                     gpointer             user_data)
1117 {
1118   GBufferedInputStream *bstream;
1119   GBufferedInputStreamPrivate *priv;
1120   GBufferedInputStreamClass *class;
1121   GInputStream *base_stream;
1122   gsize available;
1123   GSimpleAsyncResult *simple;
1124   ReadAsyncData *data;
1125
1126   bstream = G_BUFFERED_INPUT_STREAM (stream);
1127   priv = bstream->priv;
1128
1129   data = g_slice_new (ReadAsyncData);
1130   data->buffer = buffer;
1131   data->bytes_read = 0;
1132   simple = g_simple_async_result_new (G_OBJECT (stream),
1133                                       callback, user_data,
1134                                       g_buffered_input_stream_read_async);
1135   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1136
1137   available = priv->end - priv->pos;
1138
1139   if (count <= available)
1140     {
1141       memcpy (buffer, priv->buffer + priv->pos, count);
1142       priv->pos += count;
1143       data->bytes_read = count;
1144
1145       g_simple_async_result_complete_in_idle (simple);
1146       g_object_unref (simple);
1147       return;
1148     }
1149
1150
1151   /* Full request not available, read all currently available
1152    * and request refill for more
1153    */
1154
1155   memcpy (buffer, priv->buffer + priv->pos, available);
1156   priv->pos = 0;
1157   priv->end = 0;
1158
1159   count -= available;
1160
1161   data->bytes_read = available;
1162   data->count = count;
1163
1164   if (count > priv->len)
1165     {
1166       /* Large request, shortcut buffer */
1167
1168       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1169
1170       g_input_stream_read_async (base_stream,
1171                                  (char *)buffer + data->bytes_read,
1172                                  count,
1173                                  io_priority, cancellable,
1174                                  large_read_callback,
1175                                  simple);
1176     }
1177   else
1178     {
1179       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1180       class->fill_async (bstream, priv->len, io_priority, cancellable,
1181                          read_fill_buffer_callback, simple);
1182     }
1183 }
1184
1185 static gssize
1186 g_buffered_input_stream_read_finish (GInputStream   *stream,
1187                                      GAsyncResult   *result,
1188                                      GError        **error)
1189 {
1190   GSimpleAsyncResult *simple;
1191   ReadAsyncData *data;
1192
1193   simple = G_SIMPLE_ASYNC_RESULT (result);
1194
1195   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1196
1197   data = g_simple_async_result_get_op_res_gpointer (simple);
1198
1199   return data->bytes_read;
1200 }
1201
1202 typedef struct
1203 {
1204   gssize bytes_skipped;
1205   gssize count;
1206 } SkipAsyncData;
1207
1208 static void
1209 free_skip_async_data (gpointer _data)
1210 {
1211   SkipAsyncData *data = _data;
1212   g_slice_free (SkipAsyncData, data);
1213 }
1214
1215 static void
1216 large_skip_callback (GObject      *source_object,
1217                      GAsyncResult *result,
1218                      gpointer      user_data)
1219 {
1220   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1221   SkipAsyncData *data;
1222   GError *error;
1223   gssize nread;
1224
1225   data = g_simple_async_result_get_op_res_gpointer (simple);
1226
1227   error = NULL;
1228   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1229                                       result, &error);
1230
1231   /* Only report the error if we've not already read some data */
1232   if (nread < 0 && data->bytes_skipped == 0)
1233     g_simple_async_result_take_error (simple, error);
1234   else if (error)
1235     g_error_free (error);
1236
1237   if (nread > 0)
1238     data->bytes_skipped += nread;
1239
1240   /* Complete immediately, not in idle, since we're already
1241    * in a mainloop callout
1242    */
1243   g_simple_async_result_complete (simple);
1244   g_object_unref (simple);
1245 }
1246
1247 static void
1248 skip_fill_buffer_callback (GObject      *source_object,
1249                            GAsyncResult *result,
1250                            gpointer      user_data)
1251 {
1252   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1253   GBufferedInputStream *bstream;
1254   GBufferedInputStreamPrivate *priv;
1255   SkipAsyncData *data;
1256   GError *error;
1257   gssize nread;
1258   gsize available;
1259
1260   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1261   priv = bstream->priv;
1262
1263   data = g_simple_async_result_get_op_res_gpointer (simple);
1264
1265   error = NULL;
1266   nread = g_buffered_input_stream_fill_finish (bstream,
1267                                                result, &error);
1268
1269   if (nread < 0 && data->bytes_skipped == 0)
1270     g_simple_async_result_take_error (simple, error);
1271   else if (error)
1272     g_error_free (error);
1273
1274   if (nread > 0)
1275     {
1276       available = priv->end - priv->pos;
1277       data->count = MIN (data->count, available);
1278
1279       data->bytes_skipped += data->count;
1280       priv->pos += data->count;
1281     }
1282
1283   /* Complete immediately, not in idle, since we're already
1284    * in a mainloop callout
1285    */
1286   g_simple_async_result_complete (simple);
1287   g_object_unref (simple);
1288 }
1289
1290 static void
1291 g_buffered_input_stream_skip_async (GInputStream        *stream,
1292                                     gsize                count,
1293                                     int                  io_priority,
1294                                     GCancellable        *cancellable,
1295                                     GAsyncReadyCallback  callback,
1296                                     gpointer             user_data)
1297 {
1298   GBufferedInputStream *bstream;
1299   GBufferedInputStreamPrivate *priv;
1300   GBufferedInputStreamClass *class;
1301   GInputStream *base_stream;
1302   gsize available;
1303   GSimpleAsyncResult *simple;
1304   SkipAsyncData *data;
1305
1306   bstream = G_BUFFERED_INPUT_STREAM (stream);
1307   priv = bstream->priv;
1308
1309   data = g_slice_new (SkipAsyncData);
1310   data->bytes_skipped = 0;
1311   simple = g_simple_async_result_new (G_OBJECT (stream),
1312                                       callback, user_data,
1313                                       g_buffered_input_stream_skip_async);
1314   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1315
1316   available = priv->end - priv->pos;
1317
1318   if (count <= available)
1319     {
1320       priv->pos += count;
1321       data->bytes_skipped = count;
1322
1323       g_simple_async_result_complete_in_idle (simple);
1324       g_object_unref (simple);
1325       return;
1326     }
1327
1328   /* Full request not available, skip all currently available
1329    * and request refill for more
1330    */
1331
1332   priv->pos = 0;
1333   priv->end = 0;
1334
1335   count -= available;
1336
1337   data->bytes_skipped = available;
1338   data->count = count;
1339
1340   if (count > priv->len)
1341     {
1342       /* Large request, shortcut buffer */
1343
1344       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1345
1346       g_input_stream_skip_async (base_stream,
1347                                  count,
1348                                  io_priority, cancellable,
1349                                  large_skip_callback,
1350                                  simple);
1351     }
1352   else
1353     {
1354       class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1355       class->fill_async (bstream, priv->len, io_priority, cancellable,
1356                          skip_fill_buffer_callback, simple);
1357     }
1358 }
1359
1360 static gssize
1361 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1362                                      GAsyncResult   *result,
1363                                      GError        **error)
1364 {
1365   GSimpleAsyncResult *simple;
1366   SkipAsyncData *data;
1367
1368   simple = G_SIMPLE_ASYNC_RESULT (result);
1369
1370   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1371
1372   data = g_simple_async_result_get_op_res_gpointer (simple);
1373
1374   return data->bytes_skipped;
1375 }