gio/ docs/reference/gio Merged gio-standalone into glib.
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Christian Kellner <gicmo@gnome.org> 
21  */
22
23 #include <config.h>
24
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gsimpleasyncresult.h"
28 #include <string.h>
29
30 #include "glibintl.h"
31
32 #define DEFAULT_BUFFER_SIZE 4096
33
34 struct _GBufferedInputStreamPrivate {
35   guint8 *buffer;
36   gsize   len;
37   gsize   pos;
38   gsize   end;
39   GAsyncReadyCallback outstanding_callback;
40 };
41
42 enum {
43   PROP_0,
44   PROP_BUFSIZE
45 };
46
47 static void g_buffered_input_stream_set_property  (GObject      *object,
48                                                    guint         prop_id,
49                                                    const GValue *value,
50                                                    GParamSpec   *pspec);
51
52 static void g_buffered_input_stream_get_property  (GObject      *object,
53                                                    guint         prop_id,
54                                                    GValue       *value,
55                                                    GParamSpec   *pspec);
56 static void g_buffered_input_stream_finalize      (GObject *object);
57
58
59 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
60                                                         gsize                  count,
61                                                         GCancellable          *cancellable,
62                                                         GError               **error);
63 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
64                                                         gsize                  count,
65                                                         int                    io_priority,
66                                                         GCancellable          *cancellable,
67                                                         GAsyncReadyCallback    callback,
68                                                         gpointer               user_data);
69 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
70                                                         GAsyncResult          *result,
71                                                         GError               **error);
72 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
73                                                         void                  *buffer,
74                                                         gsize                  count,
75                                                         GCancellable          *cancellable,
76                                                         GError               **error);
77 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
78                                                         void                  *buffer,
79                                                         gsize                  count,
80                                                         int                    io_priority,
81                                                         GCancellable          *cancellable,
82                                                         GAsyncReadyCallback    callback,
83                                                         gpointer               user_data);
84 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
85                                                         GAsyncResult          *result,
86                                                         GError               **error);
87 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
88                                                         gssize                 count,
89                                                         GCancellable          *cancellable,
90                                                         GError               **error);
91 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
92                                                         gssize                 count,
93                                                         int                    io_priority,
94                                                         GCancellable          *cancellable,
95                                                         GAsyncReadyCallback    callback,
96                                                         gpointer               user_data);
97 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
98                                                         GAsyncResult          *result,
99                                                         GError               **error);
100
101 static void compact_buffer (GBufferedInputStream *stream);
102
103 G_DEFINE_TYPE (GBufferedInputStream,
104                g_buffered_input_stream,
105                G_TYPE_FILTER_INPUT_STREAM)
106
107
108 static void
109 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
110 {
111   GObjectClass *object_class;
112   GInputStreamClass *istream_class;
113   GBufferedInputStreamClass *bstream_class;
114
115   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
116
117   object_class = G_OBJECT_CLASS (klass);
118   object_class->get_property = g_buffered_input_stream_get_property;
119   object_class->set_property = g_buffered_input_stream_set_property;
120   object_class->finalize     = g_buffered_input_stream_finalize;
121
122   istream_class = G_INPUT_STREAM_CLASS (klass);
123   istream_class->skip = g_buffered_input_stream_skip;
124   istream_class->skip_async  = g_buffered_input_stream_skip_async;
125   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
126   istream_class->read = g_buffered_input_stream_read;
127   istream_class->read_async  = g_buffered_input_stream_read_async;
128   istream_class->read_finish = g_buffered_input_stream_read_finish;
129
130   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
131   bstream_class->fill = g_buffered_input_stream_real_fill;
132   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
133   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
134   
135   g_object_class_install_property (object_class,
136                                    PROP_BUFSIZE,
137                                    g_param_spec_uint ("buffer-size",
138                                                       P_("Buffer Size"),
139                                                       P_("The size of the backend buffer"),
140                                                       1,
141                                                       G_MAXUINT,
142                                                       DEFAULT_BUFFER_SIZE,
143                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
144                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
145
146
147 }
148
149 /**
150  * g_buffered_input_stream_get_buffer_size:
151  * @stream: #GBufferedInputStream.
152  * 
153  * Returns: the current buffer size, or -1 on error.
154  **/
155 gsize
156 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
157 {
158   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
159
160   return stream->priv->len;
161 }
162
163 /**
164  * g_buffered_input_stream_set_buffer_size:
165  * @stream: #GBufferedInputStream.
166  * @size: a #gsize.
167  *
168  * Sets the size of the internal buffer of @stream to @size, or to the 
169  * size of the contents of the buffer. The buffer can never be resized 
170  * smaller than its current contents.
171  **/
172 void
173 g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
174                                          gsize                  size)
175 {
176   GBufferedInputStreamPrivate *priv;
177   gsize in_buffer;
178   guint8 *buffer;
179   
180   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
181
182   priv = stream->priv;
183
184   if (priv->buffer)
185     {
186       in_buffer = priv->end - priv->pos;
187
188       /* Never resize smaller than current buffer contents */
189       size = MAX (size, in_buffer);
190
191       buffer = g_malloc (size);
192       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
193       priv->len = size;
194       priv->pos = 0;
195       priv->end = in_buffer;
196       g_free (priv->buffer);
197       priv->buffer = buffer;
198     }
199   else
200     {
201       priv->len = size;
202       priv->pos = 0;
203       priv->end = 0;
204       priv->buffer = g_malloc (size);
205     }
206 }
207
208 static void
209 g_buffered_input_stream_set_property (GObject         *object,
210                                       guint            prop_id,
211                                       const GValue    *value,
212                                       GParamSpec      *pspec)
213 {
214   GBufferedInputStreamPrivate *priv;
215   GBufferedInputStream        *bstream;
216
217   bstream = G_BUFFERED_INPUT_STREAM (object);
218   priv = bstream->priv;
219
220   switch (prop_id) 
221     {
222     case PROP_BUFSIZE:
223       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
224       break;
225
226     default:
227       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
228       break;
229     }
230
231 }
232
233 static void
234 g_buffered_input_stream_get_property (GObject    *object,
235                                       guint       prop_id,
236                                       GValue     *value,
237                                       GParamSpec *pspec)
238 {
239   GBufferedInputStreamPrivate *priv;
240   GBufferedInputStream        *bstream;
241
242   bstream = G_BUFFERED_INPUT_STREAM (object);
243   priv = bstream->priv;
244
245   switch (prop_id)
246     { 
247     
248     case PROP_BUFSIZE:
249       g_value_set_uint (value, priv->len);
250       break;
251     default:
252       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
253       break;
254     }
255
256 }
257
258 static void
259 g_buffered_input_stream_finalize (GObject *object)
260 {
261   GBufferedInputStreamPrivate *priv;
262   GBufferedInputStream        *stream;
263
264   stream = G_BUFFERED_INPUT_STREAM (object);
265   priv = stream->priv;
266
267   g_free (priv->buffer);
268
269   if (G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize)
270     (*G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) (object);
271 }
272
273 static void
274 g_buffered_input_stream_init (GBufferedInputStream *stream)
275 {
276   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
277                                               G_TYPE_BUFFERED_INPUT_STREAM,
278                                               GBufferedInputStreamPrivate);
279 }
280
281
282 /**
283  * g_buffered_input_stream_new:
284  * @base_stream: a #GInputStream.
285  * 
286  * Creates a new #GInputStream from the given @base_stream, with 
287  * a buffer set to the default size (4 kilobytes).
288  *
289  * Returns: a #GInputStream for the given @base_stream.
290  **/
291 GInputStream *
292 g_buffered_input_stream_new (GInputStream *base_stream)
293 {
294   GInputStream *stream;
295
296   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
297
298   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
299                          "base-stream", base_stream,
300                          NULL);
301
302   return stream;
303 }
304
305 /**
306  * g_buffered_input_stream_new_sized:
307  * @base_stream: a #GOutputStream.
308  * @size: a #gsize.
309  * 
310  * Creates a new #GBufferedInputStream from the given @base_stream, with 
311  * a buffer set to @size.
312  *
313  * Returns: a #GInputStream.
314  **/
315 GInputStream *
316 g_buffered_input_stream_new_sized (GInputStream *base_stream,
317                                    gsize         size)
318 {
319   GInputStream *stream;
320
321   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
322
323   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
324                          "base-stream", base_stream,
325                          "buffer-size", (guint)size,
326                          NULL);
327
328   return stream;
329 }
330
331 /**
332  * g_buffered_input_stream_fill:
333  * @stream: #GBufferedInputStream.
334  * @count: the number of bytes that will be read from the stream.
335  * @cancellable: optional #GCancellable object, %NULL to ignore.
336  * @error: location to store the error occuring, or %NULL to ignore.
337  *
338  * Tries to read @count bytes from the stream into the buffer. 
339  * Will block during this read.
340  * 
341  * If @count is zero, returns zero and does nothing. A value of @count
342  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
343  *
344  * On success, the number of bytes read into the buffer is returned.
345  * It is not an error if this is not the same as the requested size, as it
346  * can happen e.g. near the end of a file. Zero is returned on end of file
347  * (or if @count is zero),  but never otherwise.
348  *
349  * If @cancellable is not %NULL, then the operation can be cancelled by
350  * triggering the cancellable object from another thread. If the operation
351  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
352  * operation was partially finished when the operation was cancelled the
353  * partial result will be returned, without an error.
354  *
355  * On error -1 is returned and @error is set accordingly.
356  * 
357  * For the asynchronous, non-blocking, version of this function, see 
358  * g_buffered_input_stream_fill_async().
359  *
360  * Returns: the number of bytes read into @stream's buffer, up to @count, 
361  * or -1 on error.
362  **/
363 gssize
364 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
365                               gssize                 count,
366                               GCancellable          *cancellable,
367                               GError               **error)
368 {
369   GBufferedInputStreamClass *class;
370   GInputStream *input_stream;
371   gssize res;
372
373   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
374   
375   input_stream = G_INPUT_STREAM (stream);
376   
377   if (g_input_stream_is_closed (input_stream))
378     {
379       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
380                    _("Stream is already closed"));
381       return -1;
382     }
383   
384   if (g_input_stream_has_pending (input_stream))
385     {
386       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
387                    _("Stream has outstanding operation"));
388       return -1;
389     }
390       
391   g_input_stream_set_pending (input_stream, TRUE);
392
393   if (cancellable)
394     g_push_current_cancellable (cancellable);
395   
396   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
397   res = class->fill (stream, count, cancellable, error);
398
399   if (cancellable)
400     g_pop_current_cancellable (cancellable);
401   
402   g_input_stream_set_pending (input_stream, FALSE);
403   
404   return res;
405 }
406
407 static void
408 async_fill_callback_wrapper (GObject *source_object,
409                              GAsyncResult *res,
410                              gpointer      user_data)
411 {
412   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
413
414   g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE);
415   (*stream->priv->outstanding_callback) (source_object, res, user_data);
416   g_object_unref (stream);
417 }
418
419 /**
420  * g_buffered_input_stream_fill_async:
421  * @stream: #GBufferedInputStream.
422  * @count: a #gssize.
423  * @io_priority: the io priority of the request. the io priority of the request.
424  * @cancellable: optional #GCancellable object
425  * @callback: a #GAsyncReadyCallback.
426  * @user_data: a #gpointer.
427  *
428  * Reads data into @stream's buffer asynchronously, up to @count size.
429  * @io_priority can be used to prioritize reads. For the synchronous
430  * version of this function, see g_buffered_input_stream_fill().
431  * 
432  **/
433
434 void
435 g_buffered_input_stream_fill_async (GBufferedInputStream  *stream,
436                                     gssize                 count,
437                                     int                    io_priority,
438                                     GCancellable          *cancellable,
439                                     GAsyncReadyCallback    callback,
440                                     gpointer               user_data)
441 {
442   GBufferedInputStreamClass *class;
443   GSimpleAsyncResult *simple;
444
445   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
446
447   if (count == 0)
448     {
449       simple = g_simple_async_result_new (G_OBJECT (stream),
450                                           callback,
451                                           user_data,
452                                           g_buffered_input_stream_fill_async);
453       g_simple_async_result_complete_in_idle (simple);
454       g_object_unref (simple);
455       return;
456     }
457   
458   if (((gssize) count) < 0)
459     {
460       g_simple_async_report_error_in_idle (G_OBJECT (stream),
461                                            callback,
462                                            user_data,
463                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
464                                            _("Too large count value passed to g_input_stream_read_async"));
465       return;
466     }
467   
468   if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
469     {
470       g_simple_async_report_error_in_idle (G_OBJECT (stream),
471                                            callback,
472                                            user_data,
473                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
474                                            _("Stream is already closed"));
475       return;
476     }
477     
478   if (g_input_stream_has_pending (G_INPUT_STREAM (stream)))
479     {
480       g_simple_async_report_error_in_idle (G_OBJECT (stream),
481                                            callback,
482                                            user_data,
483                                            G_IO_ERROR, G_IO_ERROR_PENDING,
484                                            _("Stream has outstanding operation"));
485       return;
486     }
487
488   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
489   
490   g_input_stream_set_pending (G_INPUT_STREAM (stream), TRUE);
491   stream->priv->outstanding_callback = callback;
492   g_object_ref (stream);
493   class->fill_async (stream, count, io_priority, cancellable,
494                      async_fill_callback_wrapper, user_data);
495 }
496
497 /**
498  * g_buffered_input_stream_fill_finished:
499  * @stream: a #GBufferedInputStream.
500  * @result: a #GAsyncResult.
501  * @error: a #GError.
502  *
503  * Finishes an asynchronous read.
504  * 
505  * Returns: a #gssize of the read stream, or -1 on an error. 
506  **/
507 gssize
508 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
509                                      GAsyncResult          *result,
510                                      GError               **error)
511 {
512   GSimpleAsyncResult *simple;
513   GBufferedInputStreamClass *class;
514
515   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
516   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
517
518   if (G_IS_SIMPLE_ASYNC_RESULT (result))
519     {
520       simple = G_SIMPLE_ASYNC_RESULT (result);
521       if (g_simple_async_result_propagate_error (simple, error))
522         return -1;
523
524       /* Special case read of 0 bytes */
525       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
526         return 0;
527     }
528
529   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
530   return class->fill_finish (stream, result, error);
531 }
532
533 /**
534  * g_buffered_input_stream_get_available:
535  * @stream: #GBufferedInputStream.
536  * 
537  * Returns: size of the available stream. 
538  **/
539 gsize
540 g_buffered_input_stream_get_available (GBufferedInputStream  *stream)
541 {
542   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
543
544   return stream->priv->end - stream->priv->pos;
545 }
546
547 /**
548  * g_buffered_input_stream_peek:
549  * @stream: a #GBufferedInputStream.
550  * @buffer: a pointer to an allocated chunk of memory.
551  * @offset: a #gsize.
552  * @count: a #gsize.
553  * 
554  * Returns: 
555  **/
556 gsize
557 g_buffered_input_stream_peek (GBufferedInputStream  *stream,
558                               void                  *buffer,
559                               gsize                  offset,
560                               gsize                  count)
561 {
562   gsize available;
563   gsize end;
564   
565   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
566   g_return_val_if_fail (buffer != NULL, -1);
567
568   available = g_buffered_input_stream_get_available (stream);
569
570   if (offset > available)
571     return 0;
572   
573   end = MIN (offset + count, available);
574   count = end - offset;
575   
576   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
577   return count;
578 }
579
580 static void
581 compact_buffer (GBufferedInputStream *stream)
582 {
583   GBufferedInputStreamPrivate *priv;
584   gsize current_size;
585
586   priv = stream->priv;
587
588   current_size = priv->end - priv->pos;
589   
590   g_memmove (priv->buffer,
591              priv->buffer + priv->pos,
592              current_size);
593   
594   priv->pos = 0;
595   priv->end = current_size;
596 }
597
598 static gssize
599 g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
600                                    gssize                 count,
601                                    GCancellable         *cancellable,
602                                    GError              **error)
603 {
604   GBufferedInputStreamPrivate *priv;
605   GInputStream *base_stream;
606   gssize nread;
607   gsize in_buffer;
608
609   priv = stream->priv;
610
611   if (count == -1)
612     count = priv->len;
613   
614   in_buffer = priv->end - priv->pos;
615
616   /* Never fill more than can fit in the buffer */
617   count = MIN (count, priv->len - in_buffer);
618
619   /* If requested length does not fit at end, compact */
620   if (priv->len - priv->end < count)
621     compact_buffer (stream);
622
623   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
624   nread = g_input_stream_read (base_stream,
625                                priv->buffer + priv->end,
626                                count,
627                                cancellable,
628                                error);
629
630   if (nread > 0)
631     priv->end += nread;
632   
633   return nread;
634 }
635
636 static gssize
637 g_buffered_input_stream_skip (GInputStream          *stream,
638                               gsize                  count,
639                               GCancellable          *cancellable,
640                               GError               **error)
641 {
642   GBufferedInputStream        *bstream;
643   GBufferedInputStreamPrivate *priv;
644   GInputStream *base_stream;
645   gsize available, bytes_skipped;
646   gssize nread;
647
648   bstream = G_BUFFERED_INPUT_STREAM (stream);
649   priv = bstream->priv;
650
651   available = priv->end - priv->pos;
652
653   if (count <= available)
654     {
655       priv->pos += count;
656       return count;
657     }
658
659   /* Full request not available, skip all currently availbile and request refill for more */
660   
661   priv->pos = 0;
662   priv->end = 0;
663   bytes_skipped = available;
664   count -= available;
665
666   if (bytes_skipped > 0)
667     error = NULL; /* Ignore further errors if we already read some data */
668   
669   if (count > priv->len)
670     {
671       /* Large request, shortcut buffer */
672       
673       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
674
675       nread = g_input_stream_skip (base_stream,
676                                    count,
677                                    cancellable,
678                                    error);
679       
680       if (nread < 0 && bytes_skipped == 0)
681         return -1;
682       
683       if (nread > 0)
684         bytes_skipped += nread;
685       
686       return bytes_skipped;
687     }
688   
689   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
690   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
691   g_input_stream_set_pending (stream, TRUE); /* enable again */
692   
693   if (nread < 0)
694     {
695       if (bytes_skipped == 0)
696         return -1;
697       else
698         return bytes_skipped;
699     }
700   
701   available = priv->end - priv->pos;
702   count = MIN (count, available);
703   
704   bytes_skipped += count;
705   priv->pos += count;
706   
707   return bytes_skipped;
708 }
709
710 static gssize
711 g_buffered_input_stream_read (GInputStream *stream,
712                               void         *buffer,
713                               gsize         count,
714                               GCancellable *cancellable,
715                               GError      **error)
716 {
717   GBufferedInputStream        *bstream;
718   GBufferedInputStreamPrivate *priv;
719   GInputStream *base_stream;
720   gsize available, bytes_read;
721   gssize nread;
722
723   bstream = G_BUFFERED_INPUT_STREAM (stream);
724   priv = bstream->priv;
725
726   available = priv->end - priv->pos;
727
728   if (count <= available)
729     {
730       memcpy (buffer, priv->buffer + priv->pos, count);
731       priv->pos += count;
732       return count;
733     }
734   
735   /* Full request not available, read all currently availbile and request refill for more */
736   
737   memcpy (buffer, priv->buffer + priv->pos, available);
738   priv->pos = 0;
739   priv->end = 0;
740   bytes_read = available;
741   count -= available;
742
743   if (bytes_read > 0)
744     error = NULL; /* Ignore further errors if we already read some data */
745   
746   if (count > priv->len)
747     {
748       /* Large request, shortcut buffer */
749       
750       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
751
752       nread = g_input_stream_read (base_stream,
753                                    (char *)buffer + bytes_read,
754                                    count,
755                                    cancellable,
756                                    error);
757       
758       if (nread < 0 && bytes_read == 0)
759         return -1;
760       
761       if (nread > 0)
762         bytes_read += nread;
763       
764       return bytes_read;
765     }
766   
767   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
768   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
769   g_input_stream_set_pending (stream, TRUE); /* enable again */
770   if (nread < 0)
771     {
772       if (bytes_read == 0)
773         return -1;
774       else
775         return bytes_read;
776     }
777   
778   available = priv->end - priv->pos;
779   count = MIN (count, available);
780   
781   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
782   bytes_read += count;
783   priv->pos += count;
784   
785   return bytes_read;
786 }
787
788 /* ************************** */
789 /* Async stuff implementation */
790 /* ************************** */
791
792 static void
793 fill_async_callback (GObject *source_object,
794                      GAsyncResult *result,
795                      gpointer user_data)
796 {
797   GError *error;
798   gssize res;
799   GSimpleAsyncResult *simple;
800
801   simple = user_data;
802   
803   error = NULL;
804   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
805                                     result, &error);
806
807   g_simple_async_result_set_op_res_gssize (simple, res);
808   if (res == -1)
809     {
810       g_simple_async_result_set_from_error (simple, error);
811       g_error_free (error);
812     }
813   
814   /* Complete immediately, not in idle, since we're already in a mainloop callout */
815   g_simple_async_result_complete (simple);
816   g_object_unref (simple);
817 }
818
819 static void
820 g_buffered_input_stream_real_fill_async  (GBufferedInputStream *stream,
821                                           gssize                count,
822                                           int                   io_priority,
823                                           GCancellable         *cancellable,
824                                           GAsyncReadyCallback   callback,
825                                           gpointer              user_data)
826 {
827   GBufferedInputStreamPrivate *priv;
828   GInputStream *base_stream;
829   GSimpleAsyncResult *simple;
830   gsize in_buffer;
831
832   priv = stream->priv;
833
834   if (count == -1)
835     count = priv->len;
836   
837   in_buffer = priv->end - priv->pos;
838
839   /* Never fill more than can fit in the buffer */
840   count = MIN (count, priv->len - in_buffer);
841
842   /* If requested length does not fit at end, compact */
843   if (priv->len - priv->end < count)
844     compact_buffer (stream);
845
846   simple = g_simple_async_result_new (G_OBJECT (stream),
847                                       callback, user_data,
848                                       g_buffered_input_stream_real_fill_async);
849   
850   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
851   g_input_stream_read_async (base_stream,
852                              priv->buffer + priv->end,
853                              count,
854                              io_priority,
855                              cancellable,
856                              fill_async_callback,
857                              simple);
858 }
859
860 static gssize
861 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
862                                           GAsyncResult         *result,
863                                           GError              **error)
864 {
865   GSimpleAsyncResult *simple;
866   gssize nread;
867
868   simple = G_SIMPLE_ASYNC_RESULT (result);
869   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
870   
871   nread = g_simple_async_result_get_op_res_gssize (simple);
872   return nread;
873 }
874
875 typedef struct {
876   gssize bytes_read;
877   gssize count;
878   void *buffer;
879 } ReadAsyncData;
880
881 static void 
882 free_read_async_data (gpointer _data)
883 {
884   ReadAsyncData *data = _data;
885   g_slice_free (ReadAsyncData, data);
886 }
887
888 static void
889 large_read_callback (GObject *source_object,
890                      GAsyncResult *result,
891                      gpointer user_data)
892 {
893   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
894   ReadAsyncData *data;
895   GError *error;
896   gssize nread;
897
898   data = g_simple_async_result_get_op_res_gpointer (simple);
899   
900   error = NULL;
901   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
902                                       result, &error);
903
904   /* Only report the error if we've not already read some data */
905   if (nread < 0 && data->bytes_read == 0)
906     g_simple_async_result_set_from_error (simple, error);
907   
908   if (nread > 0)
909     data->bytes_read += nread;
910   
911   if (error)
912     g_error_free (error);
913   
914   /* Complete immediately, not in idle, since we're already in a mainloop callout */
915   g_simple_async_result_complete (simple);
916   g_object_unref (simple);
917 }
918
919 static void
920 read_fill_buffer_callback (GObject *source_object,
921                            GAsyncResult *result,
922                            gpointer user_data)
923 {
924   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
925   GBufferedInputStream *bstream;
926   GBufferedInputStreamPrivate *priv;
927   ReadAsyncData *data;
928   GError *error;
929   gssize nread;
930   gsize available;
931
932   bstream = G_BUFFERED_INPUT_STREAM (source_object);
933   priv = bstream->priv;
934   
935   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
936   
937   data = g_simple_async_result_get_op_res_gpointer (simple);
938   
939   error = NULL;
940   nread = g_buffered_input_stream_fill_finish (bstream,
941                                                result, &error);
942   
943   if (nread < 0 && data->bytes_read == 0)
944     g_simple_async_result_set_from_error (simple, error);
945
946
947   if (nread > 0)
948     {
949       available = priv->end - priv->pos;
950       data->count = MIN (data->count, available);
951       
952       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
953       data->bytes_read += data->count;
954       priv->pos += data->count;
955     }
956
957   if (error)
958     g_error_free (error);
959   
960   /* Complete immediately, not in idle, since we're already in a mainloop callout */
961   g_simple_async_result_complete (simple);
962   g_object_unref (simple);
963 }
964
965 static void
966 g_buffered_input_stream_read_async (GInputStream              *stream,
967                                     void                      *buffer,
968                                     gsize                      count,
969                                     int                        io_priority,
970                                     GCancellable              *cancellable,
971                                     GAsyncReadyCallback        callback,
972                                     gpointer                   user_data)
973 {
974   GBufferedInputStream *bstream;
975   GBufferedInputStreamPrivate *priv;
976   GInputStream *base_stream;
977   gsize available;
978   GSimpleAsyncResult *simple;
979   ReadAsyncData *data;
980
981   bstream = G_BUFFERED_INPUT_STREAM (stream);
982   priv = bstream->priv;
983
984   data = g_slice_new (ReadAsyncData);
985   data->buffer = buffer;
986   data->bytes_read = 0;
987   simple = g_simple_async_result_new (G_OBJECT (stream),
988                                       callback, user_data,
989                                       g_buffered_input_stream_read_async);
990   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
991   
992   available = priv->end - priv->pos;
993   
994   if (count <= available)
995     {
996       memcpy (buffer, priv->buffer + priv->pos, count);
997       priv->pos += count;
998       data->bytes_read = count;
999       
1000       g_simple_async_result_complete_in_idle (simple);
1001       g_object_unref (simple);
1002       return;
1003     }
1004
1005
1006   /* Full request not available, read all currently availbile and request refill for more */
1007   
1008   memcpy (buffer, priv->buffer + priv->pos, available);
1009   priv->pos = 0;
1010   priv->end = 0;
1011   
1012   count -= available;
1013   
1014   data->bytes_read = available;
1015   data->count = count;
1016
1017   if (count > priv->len)
1018     {
1019       /* Large request, shortcut buffer */
1020       
1021       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1022       
1023       g_input_stream_read_async (base_stream,
1024                                  (char *)buffer + data->bytes_read,
1025                                  count,
1026                                  io_priority, cancellable,
1027                                  large_read_callback,
1028                                  simple);
1029     }
1030   else
1031     {
1032       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1033       g_buffered_input_stream_fill_async (bstream, priv->len,
1034                                           io_priority, cancellable,
1035                                           read_fill_buffer_callback, simple);
1036     }
1037 }
1038
1039 static gssize
1040 g_buffered_input_stream_read_finish (GInputStream   *stream,
1041                                      GAsyncResult   *result,
1042                                      GError        **error)
1043 {
1044   GSimpleAsyncResult *simple;
1045   ReadAsyncData *data;
1046   
1047   simple = G_SIMPLE_ASYNC_RESULT (result);
1048   
1049   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1050
1051   data = g_simple_async_result_get_op_res_gpointer (simple);
1052   
1053   return data->bytes_read;
1054 }
1055
1056 typedef struct {
1057   gssize bytes_skipped;
1058   gssize count;
1059 } SkipAsyncData;
1060
1061 static void 
1062 free_skip_async_data (gpointer _data)
1063 {
1064   SkipAsyncData *data = _data;
1065   g_slice_free (SkipAsyncData, data);
1066 }
1067
1068 static void
1069 large_skip_callback (GObject *source_object,
1070                      GAsyncResult *result,
1071                      gpointer user_data)
1072 {
1073   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1074   SkipAsyncData *data;
1075   GError *error;
1076   gssize nread;
1077
1078   data = g_simple_async_result_get_op_res_gpointer (simple);
1079   
1080   error = NULL;
1081   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1082                                       result, &error);
1083
1084   /* Only report the error if we've not already read some data */
1085   if (nread < 0 && data->bytes_skipped == 0)
1086     g_simple_async_result_set_from_error (simple, error);
1087   
1088   if (nread > 0)
1089     data->bytes_skipped += nread;
1090   
1091   if (error)
1092     g_error_free (error);
1093   
1094   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1095   g_simple_async_result_complete (simple);
1096   g_object_unref (simple);
1097 }
1098
1099 static void
1100 skip_fill_buffer_callback (GObject *source_object,
1101                            GAsyncResult *result,
1102                            gpointer user_data)
1103 {
1104   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1105   GBufferedInputStream *bstream;
1106   GBufferedInputStreamPrivate *priv;
1107   SkipAsyncData *data;
1108   GError *error;
1109   gssize nread;
1110   gsize available;
1111
1112   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1113   priv = bstream->priv;
1114   
1115   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1116   
1117   data = g_simple_async_result_get_op_res_gpointer (simple);
1118   
1119   error = NULL;
1120   nread = g_buffered_input_stream_fill_finish (bstream,
1121                                                result, &error);
1122   
1123   if (nread < 0 && data->bytes_skipped == 0)
1124     g_simple_async_result_set_from_error (simple, error);
1125
1126
1127   if (nread > 0)
1128     {
1129       available = priv->end - priv->pos;
1130       data->count = MIN (data->count, available);
1131       
1132       data->bytes_skipped += data->count;
1133       priv->pos += data->count;
1134     }
1135
1136   if (error)
1137     g_error_free (error);
1138   
1139   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1140   g_simple_async_result_complete (simple);
1141   g_object_unref (simple);
1142 }
1143
1144 static void
1145 g_buffered_input_stream_skip_async (GInputStream              *stream,
1146                                     gsize                      count,
1147                                     int                        io_priority,
1148                                     GCancellable              *cancellable,
1149                                     GAsyncReadyCallback        callback,
1150                                     gpointer                   user_data)
1151 {
1152   GBufferedInputStream *bstream;
1153   GBufferedInputStreamPrivate *priv;
1154   GInputStream *base_stream;
1155   gsize available;
1156   GSimpleAsyncResult *simple;
1157   SkipAsyncData *data;
1158
1159   bstream = G_BUFFERED_INPUT_STREAM (stream);
1160   priv = bstream->priv;
1161
1162   data = g_slice_new (SkipAsyncData);
1163   data->bytes_skipped = 0;
1164   simple = g_simple_async_result_new (G_OBJECT (stream),
1165                                       callback, user_data,
1166                                       g_buffered_input_stream_skip_async);
1167   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1168   
1169   available = priv->end - priv->pos;
1170   
1171   if (count <= available)
1172     {
1173       priv->pos += count;
1174       data->bytes_skipped = count;
1175       
1176       g_simple_async_result_complete_in_idle (simple);
1177       g_object_unref (simple);
1178       return;
1179     }
1180
1181
1182   /* Full request not available, skip all currently availbile and request refill for more */
1183   
1184   priv->pos = 0;
1185   priv->end = 0;
1186   
1187   count -= available;
1188   
1189   data->bytes_skipped = available;
1190   data->count = count;
1191
1192   if (count > priv->len)
1193     {
1194       /* Large request, shortcut buffer */
1195       
1196       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1197       
1198       g_input_stream_skip_async (base_stream,
1199                                  count,
1200                                  io_priority, cancellable,
1201                                  large_skip_callback,
1202                                  simple);
1203     }
1204   else
1205     {
1206       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1207       g_buffered_input_stream_fill_async (bstream, priv->len,
1208                                           io_priority, cancellable,
1209                                           skip_fill_buffer_callback, simple);
1210     }
1211 }
1212
1213 static gssize
1214 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1215                                      GAsyncResult   *result,
1216                                      GError        **error)
1217 {
1218   GSimpleAsyncResult *simple;
1219   SkipAsyncData *data;
1220   
1221   simple = G_SIMPLE_ASYNC_RESULT (result);
1222   
1223   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1224
1225   data = g_simple_async_result_get_op_res_gpointer (simple);
1226   
1227   return data->bytes_skipped;
1228 }
1229
1230 /* vim: ts=2 sw=2 et */