New functions for efficient access to buffer and simple single byte reads.
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org> 
22  */
23
24 #include <config.h>
25
26 #include "gbufferedinputstream.h"
27 #include "ginputstream.h"
28 #include "gsimpleasyncresult.h"
29 #include <string.h>
30
31 #include "glibintl.h"
32
33 #define DEFAULT_BUFFER_SIZE 4096
34
35 struct _GBufferedInputStreamPrivate {
36   guint8 *buffer;
37   gsize   len;
38   gsize   pos;
39   gsize   end;
40   GAsyncReadyCallback outstanding_callback;
41 };
42
43 enum {
44   PROP_0,
45   PROP_BUFSIZE
46 };
47
48 static void g_buffered_input_stream_set_property  (GObject      *object,
49                                                    guint         prop_id,
50                                                    const GValue *value,
51                                                    GParamSpec   *pspec);
52
53 static void g_buffered_input_stream_get_property  (GObject      *object,
54                                                    guint         prop_id,
55                                                    GValue       *value,
56                                                    GParamSpec   *pspec);
57 static void g_buffered_input_stream_finalize      (GObject *object);
58
59
60 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
61                                                         gsize                  count,
62                                                         GCancellable          *cancellable,
63                                                         GError               **error);
64 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
65                                                         gsize                  count,
66                                                         int                    io_priority,
67                                                         GCancellable          *cancellable,
68                                                         GAsyncReadyCallback    callback,
69                                                         gpointer               user_data);
70 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
71                                                         GAsyncResult          *result,
72                                                         GError               **error);
73 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
74                                                         void                  *buffer,
75                                                         gsize                  count,
76                                                         GCancellable          *cancellable,
77                                                         GError               **error);
78 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
79                                                         void                  *buffer,
80                                                         gsize                  count,
81                                                         int                    io_priority,
82                                                         GCancellable          *cancellable,
83                                                         GAsyncReadyCallback    callback,
84                                                         gpointer               user_data);
85 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
86                                                         GAsyncResult          *result,
87                                                         GError               **error);
88 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
89                                                         gssize                 count,
90                                                         GCancellable          *cancellable,
91                                                         GError               **error);
92 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
93                                                         gssize                 count,
94                                                         int                    io_priority,
95                                                         GCancellable          *cancellable,
96                                                         GAsyncReadyCallback    callback,
97                                                         gpointer               user_data);
98 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
99                                                         GAsyncResult          *result,
100                                                         GError               **error);
101
102 static void compact_buffer (GBufferedInputStream *stream);
103
104 G_DEFINE_TYPE (GBufferedInputStream,
105                g_buffered_input_stream,
106                G_TYPE_FILTER_INPUT_STREAM)
107
108
109 static void
110 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
111 {
112   GObjectClass *object_class;
113   GInputStreamClass *istream_class;
114   GBufferedInputStreamClass *bstream_class;
115
116   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
117
118   object_class = G_OBJECT_CLASS (klass);
119   object_class->get_property = g_buffered_input_stream_get_property;
120   object_class->set_property = g_buffered_input_stream_set_property;
121   object_class->finalize     = g_buffered_input_stream_finalize;
122
123   istream_class = G_INPUT_STREAM_CLASS (klass);
124   istream_class->skip = g_buffered_input_stream_skip;
125   istream_class->skip_async  = g_buffered_input_stream_skip_async;
126   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
127   istream_class->read = g_buffered_input_stream_read;
128   istream_class->read_async  = g_buffered_input_stream_read_async;
129   istream_class->read_finish = g_buffered_input_stream_read_finish;
130
131   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
132   bstream_class->fill = g_buffered_input_stream_real_fill;
133   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
134   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
135   
136   g_object_class_install_property (object_class,
137                                    PROP_BUFSIZE,
138                                    g_param_spec_uint ("buffer-size",
139                                                       P_("Buffer Size"),
140                                                       P_("The size of the backend buffer"),
141                                                       1,
142                                                       G_MAXUINT,
143                                                       DEFAULT_BUFFER_SIZE,
144                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
145                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
146
147
148 }
149
150 /**
151  * g_buffered_input_stream_get_buffer_size:
152  * @stream: #GBufferedInputStream.
153  * 
154  * Returns: the current buffer size, or -1 on error.
155  **/
156 gsize
157 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
158 {
159   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
160
161   return stream->priv->len;
162 }
163
164 /**
165  * g_buffered_input_stream_set_buffer_size:
166  * @stream: #GBufferedInputStream.
167  * @size: a #gsize.
168  *
169  * Sets the size of the internal buffer of @stream to @size, or to the 
170  * size of the contents of the buffer. The buffer can never be resized 
171  * smaller than its current contents.
172  **/
173 void
174 g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
175                                          gsize                  size)
176 {
177   GBufferedInputStreamPrivate *priv;
178   gsize in_buffer;
179   guint8 *buffer;
180   
181   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
182
183   priv = stream->priv;
184
185   if (priv->buffer)
186     {
187       in_buffer = priv->end - priv->pos;
188
189       /* Never resize smaller than current buffer contents */
190       size = MAX (size, in_buffer);
191
192       buffer = g_malloc (size);
193       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
194       priv->len = size;
195       priv->pos = 0;
196       priv->end = in_buffer;
197       g_free (priv->buffer);
198       priv->buffer = buffer;
199     }
200   else
201     {
202       priv->len = size;
203       priv->pos = 0;
204       priv->end = 0;
205       priv->buffer = g_malloc (size);
206     }
207 }
208
209 static void
210 g_buffered_input_stream_set_property (GObject         *object,
211                                       guint            prop_id,
212                                       const GValue    *value,
213                                       GParamSpec      *pspec)
214 {
215   GBufferedInputStreamPrivate *priv;
216   GBufferedInputStream        *bstream;
217
218   bstream = G_BUFFERED_INPUT_STREAM (object);
219   priv = bstream->priv;
220
221   switch (prop_id) 
222     {
223     case PROP_BUFSIZE:
224       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
225       break;
226
227     default:
228       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
229       break;
230     }
231
232 }
233
234 static void
235 g_buffered_input_stream_get_property (GObject    *object,
236                                       guint       prop_id,
237                                       GValue     *value,
238                                       GParamSpec *pspec)
239 {
240   GBufferedInputStreamPrivate *priv;
241   GBufferedInputStream        *bstream;
242
243   bstream = G_BUFFERED_INPUT_STREAM (object);
244   priv = bstream->priv;
245
246   switch (prop_id)
247     { 
248     
249     case PROP_BUFSIZE:
250       g_value_set_uint (value, priv->len);
251       break;
252     default:
253       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
254       break;
255     }
256
257 }
258
259 static void
260 g_buffered_input_stream_finalize (GObject *object)
261 {
262   GBufferedInputStreamPrivate *priv;
263   GBufferedInputStream        *stream;
264
265   stream = G_BUFFERED_INPUT_STREAM (object);
266   priv = stream->priv;
267
268   g_free (priv->buffer);
269
270   if (G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize)
271     (*G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) (object);
272 }
273
274 static void
275 g_buffered_input_stream_init (GBufferedInputStream *stream)
276 {
277   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
278                                               G_TYPE_BUFFERED_INPUT_STREAM,
279                                               GBufferedInputStreamPrivate);
280 }
281
282
283 /**
284  * g_buffered_input_stream_new:
285  * @base_stream: a #GInputStream.
286  * 
287  * Creates a new #GInputStream from the given @base_stream, with 
288  * a buffer set to the default size (4 kilobytes).
289  *
290  * Returns: a #GInputStream for the given @base_stream.
291  **/
292 GInputStream *
293 g_buffered_input_stream_new (GInputStream *base_stream)
294 {
295   GInputStream *stream;
296
297   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
298
299   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
300                          "base-stream", base_stream,
301                          NULL);
302
303   return stream;
304 }
305
306 /**
307  * g_buffered_input_stream_new_sized:
308  * @base_stream: a #GOutputStream.
309  * @size: a #gsize.
310  * 
311  * Creates a new #GBufferedInputStream from the given @base_stream, with 
312  * a buffer set to @size.
313  *
314  * Returns: a #GInputStream.
315  **/
316 GInputStream *
317 g_buffered_input_stream_new_sized (GInputStream *base_stream,
318                                    gsize         size)
319 {
320   GInputStream *stream;
321
322   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
323
324   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
325                          "base-stream", base_stream,
326                          "buffer-size", (guint)size,
327                          NULL);
328
329   return stream;
330 }
331
332 /**
333  * g_buffered_input_stream_fill:
334  * @stream: #GBufferedInputStream.
335  * @count: the number of bytes that will be read from the stream.
336  * @cancellable: optional #GCancellable object, %NULL to ignore.
337  * @error: location to store the error occuring, or %NULL to ignore.
338  *
339  * Tries to read @count bytes from the stream into the buffer. 
340  * Will block during this read.
341  * 
342  * If @count is zero, returns zero and does nothing. A value of @count
343  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
344  *
345  * On success, the number of bytes read into the buffer is returned.
346  * It is not an error if this is not the same as the requested size, as it
347  * can happen e.g. near the end of a file. Zero is returned on end of file
348  * (or if @count is zero),  but never otherwise.
349  *
350  * If @cancellable is not %NULL, then the operation can be cancelled by
351  * triggering the cancellable object from another thread. If the operation
352  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
353  * operation was partially finished when the operation was cancelled the
354  * partial result will be returned, without an error.
355  *
356  * On error -1 is returned and @error is set accordingly.
357  * 
358  * For the asynchronous, non-blocking, version of this function, see 
359  * g_buffered_input_stream_fill_async().
360  *
361  * Returns: the number of bytes read into @stream's buffer, up to @count, 
362  * or -1 on error.
363  **/
364 gssize
365 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
366                               gssize                 count,
367                               GCancellable          *cancellable,
368                               GError               **error)
369 {
370   GBufferedInputStreamClass *class;
371   GInputStream *input_stream;
372   gssize res;
373
374   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
375   
376   input_stream = G_INPUT_STREAM (stream);
377   
378   if (g_input_stream_is_closed (input_stream))
379     {
380       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
381                    _("Stream is already closed"));
382       return -1;
383     }
384   
385   if (g_input_stream_has_pending (input_stream))
386     {
387       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
388                    _("Stream has outstanding operation"));
389       return -1;
390     }
391       
392   g_input_stream_set_pending (input_stream, TRUE);
393
394   if (cancellable)
395     g_push_current_cancellable (cancellable);
396   
397   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
398   res = class->fill (stream, count, cancellable, error);
399
400   if (cancellable)
401     g_pop_current_cancellable (cancellable);
402   
403   g_input_stream_set_pending (input_stream, FALSE);
404   
405   return res;
406 }
407
408 static void
409 async_fill_callback_wrapper (GObject *source_object,
410                              GAsyncResult *res,
411                              gpointer      user_data)
412 {
413   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
414
415   g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE);
416   (*stream->priv->outstanding_callback) (source_object, res, user_data);
417   g_object_unref (stream);
418 }
419
420 /**
421  * g_buffered_input_stream_fill_async:
422  * @stream: #GBufferedInputStream.
423  * @count: a #gssize.
424  * @io_priority: the io priority of the request. the io priority of the request.
425  * @cancellable: optional #GCancellable object
426  * @callback: a #GAsyncReadyCallback.
427  * @user_data: a #gpointer.
428  *
429  * Reads data into @stream's buffer asynchronously, up to @count size.
430  * @io_priority can be used to prioritize reads. For the synchronous
431  * version of this function, see g_buffered_input_stream_fill().
432  * 
433  **/
434
435 void
436 g_buffered_input_stream_fill_async (GBufferedInputStream  *stream,
437                                     gssize                 count,
438                                     int                    io_priority,
439                                     GCancellable          *cancellable,
440                                     GAsyncReadyCallback    callback,
441                                     gpointer               user_data)
442 {
443   GBufferedInputStreamClass *class;
444   GSimpleAsyncResult *simple;
445
446   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
447
448   if (count == 0)
449     {
450       simple = g_simple_async_result_new (G_OBJECT (stream),
451                                           callback,
452                                           user_data,
453                                           g_buffered_input_stream_fill_async);
454       g_simple_async_result_complete_in_idle (simple);
455       g_object_unref (simple);
456       return;
457     }
458   
459   if (((gssize) count) < 0)
460     {
461       g_simple_async_report_error_in_idle (G_OBJECT (stream),
462                                            callback,
463                                            user_data,
464                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
465                                            _("Too large count value passed to g_input_stream_read_async"));
466       return;
467     }
468   
469   if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
470     {
471       g_simple_async_report_error_in_idle (G_OBJECT (stream),
472                                            callback,
473                                            user_data,
474                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
475                                            _("Stream is already closed"));
476       return;
477     }
478     
479   if (g_input_stream_has_pending (G_INPUT_STREAM (stream)))
480     {
481       g_simple_async_report_error_in_idle (G_OBJECT (stream),
482                                            callback,
483                                            user_data,
484                                            G_IO_ERROR, G_IO_ERROR_PENDING,
485                                            _("Stream has outstanding operation"));
486       return;
487     }
488
489   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
490   
491   g_input_stream_set_pending (G_INPUT_STREAM (stream), TRUE);
492   stream->priv->outstanding_callback = callback;
493   g_object_ref (stream);
494   class->fill_async (stream, count, io_priority, cancellable,
495                      async_fill_callback_wrapper, user_data);
496 }
497
498 /**
499  * g_buffered_input_stream_fill_finished:
500  * @stream: a #GBufferedInputStream.
501  * @result: a #GAsyncResult.
502  * @error: a #GError.
503  *
504  * Finishes an asynchronous read.
505  * 
506  * Returns: a #gssize of the read stream, or -1 on an error. 
507  **/
508 gssize
509 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
510                                      GAsyncResult          *result,
511                                      GError               **error)
512 {
513   GSimpleAsyncResult *simple;
514   GBufferedInputStreamClass *class;
515
516   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
517   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
518
519   if (G_IS_SIMPLE_ASYNC_RESULT (result))
520     {
521       simple = G_SIMPLE_ASYNC_RESULT (result);
522       if (g_simple_async_result_propagate_error (simple, error))
523         return -1;
524
525       /* Special case read of 0 bytes */
526       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
527         return 0;
528     }
529
530   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
531   return class->fill_finish (stream, result, error);
532 }
533
534 /**
535  * g_buffered_input_stream_get_available:
536  * @stream: #GBufferedInputStream.
537  * 
538  * Returns: size of the available stream. 
539  **/
540 gsize
541 g_buffered_input_stream_get_available (GBufferedInputStream  *stream)
542 {
543   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
544
545   return stream->priv->end - stream->priv->pos;
546 }
547
548 /**
549  * g_buffered_input_stream_peek:
550  * @stream: a #GBufferedInputStream.
551  * @buffer: a pointer to an allocated chunk of memory.
552  * @offset: a #gsize.
553  * @count: a #gsize.
554  * 
555  * Returns: 
556  **/
557 gsize
558 g_buffered_input_stream_peek (GBufferedInputStream  *stream,
559                               void                  *buffer,
560                               gsize                  offset,
561                               gsize                  count)
562 {
563   gsize available;
564   gsize end;
565   
566   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
567   g_return_val_if_fail (buffer != NULL, -1);
568
569   available = g_buffered_input_stream_get_available (stream);
570
571   if (offset > available)
572     return 0;
573   
574   end = MIN (offset + count, available);
575   count = end - offset;
576   
577   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
578   return count;
579 }
580
581 /**
582  * g_buffered_input_stream_peek_buffer:
583  * @stream: a #GBufferedInputStream.
584  * @count: a #gsize to get the number of bytes available in the buffer.
585  *
586  * Returns the buffer with the currently available bytes. The returned
587  * buffer must not be modified and will become invalid when reading from
588  * the stream or filling the buffer.
589  *
590  * Returns: read-only buffer
591  **/
592 const void*
593 g_buffered_input_stream_peek_buffer (GBufferedInputStream  *stream,
594                                      gsize                 *count)
595 {
596   GBufferedInputStreamPrivate *priv;
597
598   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
599
600   priv = stream->priv;
601
602   if (count) {
603     *count = priv->end - priv->pos;
604   }
605
606   return priv->buffer + priv->pos;
607 }
608
609 static void
610 compact_buffer (GBufferedInputStream *stream)
611 {
612   GBufferedInputStreamPrivate *priv;
613   gsize current_size;
614
615   priv = stream->priv;
616
617   current_size = priv->end - priv->pos;
618   
619   g_memmove (priv->buffer,
620              priv->buffer + priv->pos,
621              current_size);
622   
623   priv->pos = 0;
624   priv->end = current_size;
625 }
626
627 static gssize
628 g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
629                                    gssize                 count,
630                                    GCancellable         *cancellable,
631                                    GError              **error)
632 {
633   GBufferedInputStreamPrivate *priv;
634   GInputStream *base_stream;
635   gssize nread;
636   gsize in_buffer;
637
638   priv = stream->priv;
639
640   if (count == -1)
641     count = priv->len;
642   
643   in_buffer = priv->end - priv->pos;
644
645   /* Never fill more than can fit in the buffer */
646   count = MIN (count, priv->len - in_buffer);
647
648   /* If requested length does not fit at end, compact */
649   if (priv->len - priv->end < count)
650     compact_buffer (stream);
651
652   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
653   nread = g_input_stream_read (base_stream,
654                                priv->buffer + priv->end,
655                                count,
656                                cancellable,
657                                error);
658
659   if (nread > 0)
660     priv->end += nread;
661   
662   return nread;
663 }
664
665 static gssize
666 g_buffered_input_stream_skip (GInputStream          *stream,
667                               gsize                  count,
668                               GCancellable          *cancellable,
669                               GError               **error)
670 {
671   GBufferedInputStream        *bstream;
672   GBufferedInputStreamPrivate *priv;
673   GInputStream *base_stream;
674   gsize available, bytes_skipped;
675   gssize nread;
676
677   bstream = G_BUFFERED_INPUT_STREAM (stream);
678   priv = bstream->priv;
679
680   available = priv->end - priv->pos;
681
682   if (count <= available)
683     {
684       priv->pos += count;
685       return count;
686     }
687
688   /* Full request not available, skip all currently availbile and request refill for more */
689   
690   priv->pos = 0;
691   priv->end = 0;
692   bytes_skipped = available;
693   count -= available;
694
695   if (bytes_skipped > 0)
696     error = NULL; /* Ignore further errors if we already read some data */
697   
698   if (count > priv->len)
699     {
700       /* Large request, shortcut buffer */
701       
702       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
703
704       nread = g_input_stream_skip (base_stream,
705                                    count,
706                                    cancellable,
707                                    error);
708       
709       if (nread < 0 && bytes_skipped == 0)
710         return -1;
711       
712       if (nread > 0)
713         bytes_skipped += nread;
714       
715       return bytes_skipped;
716     }
717   
718   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
719   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
720   g_input_stream_set_pending (stream, TRUE); /* enable again */
721   
722   if (nread < 0)
723     {
724       if (bytes_skipped == 0)
725         return -1;
726       else
727         return bytes_skipped;
728     }
729   
730   available = priv->end - priv->pos;
731   count = MIN (count, available);
732   
733   bytes_skipped += count;
734   priv->pos += count;
735   
736   return bytes_skipped;
737 }
738
739 static gssize
740 g_buffered_input_stream_read (GInputStream *stream,
741                               void         *buffer,
742                               gsize         count,
743                               GCancellable *cancellable,
744                               GError      **error)
745 {
746   GBufferedInputStream        *bstream;
747   GBufferedInputStreamPrivate *priv;
748   GInputStream *base_stream;
749   gsize available, bytes_read;
750   gssize nread;
751
752   bstream = G_BUFFERED_INPUT_STREAM (stream);
753   priv = bstream->priv;
754
755   available = priv->end - priv->pos;
756
757   if (count <= available)
758     {
759       memcpy (buffer, priv->buffer + priv->pos, count);
760       priv->pos += count;
761       return count;
762     }
763   
764   /* Full request not available, read all currently availbile and request refill for more */
765   
766   memcpy (buffer, priv->buffer + priv->pos, available);
767   priv->pos = 0;
768   priv->end = 0;
769   bytes_read = available;
770   count -= available;
771
772   if (bytes_read > 0)
773     error = NULL; /* Ignore further errors if we already read some data */
774   
775   if (count > priv->len)
776     {
777       /* Large request, shortcut buffer */
778       
779       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
780
781       nread = g_input_stream_read (base_stream,
782                                    (char *)buffer + bytes_read,
783                                    count,
784                                    cancellable,
785                                    error);
786       
787       if (nread < 0 && bytes_read == 0)
788         return -1;
789       
790       if (nread > 0)
791         bytes_read += nread;
792       
793       return bytes_read;
794     }
795   
796   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
797   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
798   g_input_stream_set_pending (stream, TRUE); /* enable again */
799   if (nread < 0)
800     {
801       if (bytes_read == 0)
802         return -1;
803       else
804         return bytes_read;
805     }
806   
807   available = priv->end - priv->pos;
808   count = MIN (count, available);
809   
810   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
811   bytes_read += count;
812   priv->pos += count;
813   
814   return bytes_read;
815 }
816
817 /**
818  * g_buffered_input_stream_read_byte:
819  * @stream: #GBufferedInputStream.
820  * @cancellable: optional #GCancellable object, %NULL to ignore.
821  * @error: location to store the error occuring, or %NULL to ignore.
822  *
823  * Tries to read a single byte from the stream or the buffer. Will block
824  * during this read.
825  *
826  * On success, the byte read from the stream is returned. On end of stream
827  * -1 is returned but it's not an exceptional error and @error is not set.
828  * 
829  * If @cancellable is not %NULL, then the operation can be cancelled by
830  * triggering the cancellable object from another thread. If the operation
831  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
832  * operation was partially finished when the operation was cancelled the
833  * partial result will be returned, without an error.
834  *
835  * On error -1 is returned and @error is set accordingly.
836  * 
837  * Returns: the byte read from the @stream, or -1 on end of stream or error.
838  **/
839 int
840 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
841                                    GCancellable          *cancellable,
842                                    GError               **error)
843 {
844   GBufferedInputStreamPrivate *priv;
845   GInputStream *input_stream;
846   gsize available;
847   gssize nread;
848
849   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
850
851   priv = stream->priv;
852   input_stream = G_INPUT_STREAM (stream);
853
854   if (g_input_stream_is_closed (input_stream))
855     {
856       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
857        _("Stream is already closed"));
858       return -1;
859     }
860
861   if (g_input_stream_has_pending (input_stream))
862     {
863       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
864        _("Stream has outstanding operation"));
865       return -1;
866     }
867
868   available = priv->end - priv->pos;
869
870   if (available < 1)
871     return priv->buffer[priv->pos++];
872
873   /* Byte not available, request refill for more */
874
875   if (cancellable)
876     g_push_current_cancellable (cancellable);
877
878   priv->pos = 0;
879   priv->end = 0;
880
881   nread = g_buffered_input_stream_fill (stream, priv->len, cancellable, error);
882
883   if (cancellable)
884     g_pop_current_cancellable (cancellable);
885
886   if (nread <= 0)
887     return -1; /* error or end of stream */
888
889   return priv->buffer[priv->pos++];
890 }
891
892 /* ************************** */
893 /* Async stuff implementation */
894 /* ************************** */
895
896 static void
897 fill_async_callback (GObject *source_object,
898                      GAsyncResult *result,
899                      gpointer user_data)
900 {
901   GError *error;
902   gssize res;
903   GSimpleAsyncResult *simple;
904
905   simple = user_data;
906   
907   error = NULL;
908   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
909                                     result, &error);
910
911   g_simple_async_result_set_op_res_gssize (simple, res);
912   if (res == -1)
913     {
914       g_simple_async_result_set_from_error (simple, error);
915       g_error_free (error);
916     }
917   
918   /* Complete immediately, not in idle, since we're already in a mainloop callout */
919   g_simple_async_result_complete (simple);
920   g_object_unref (simple);
921 }
922
923 static void
924 g_buffered_input_stream_real_fill_async  (GBufferedInputStream *stream,
925                                           gssize                count,
926                                           int                   io_priority,
927                                           GCancellable         *cancellable,
928                                           GAsyncReadyCallback   callback,
929                                           gpointer              user_data)
930 {
931   GBufferedInputStreamPrivate *priv;
932   GInputStream *base_stream;
933   GSimpleAsyncResult *simple;
934   gsize in_buffer;
935
936   priv = stream->priv;
937
938   if (count == -1)
939     count = priv->len;
940   
941   in_buffer = priv->end - priv->pos;
942
943   /* Never fill more than can fit in the buffer */
944   count = MIN (count, priv->len - in_buffer);
945
946   /* If requested length does not fit at end, compact */
947   if (priv->len - priv->end < count)
948     compact_buffer (stream);
949
950   simple = g_simple_async_result_new (G_OBJECT (stream),
951                                       callback, user_data,
952                                       g_buffered_input_stream_real_fill_async);
953   
954   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
955   g_input_stream_read_async (base_stream,
956                              priv->buffer + priv->end,
957                              count,
958                              io_priority,
959                              cancellable,
960                              fill_async_callback,
961                              simple);
962 }
963
964 static gssize
965 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
966                                           GAsyncResult         *result,
967                                           GError              **error)
968 {
969   GSimpleAsyncResult *simple;
970   gssize nread;
971
972   simple = G_SIMPLE_ASYNC_RESULT (result);
973   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
974   
975   nread = g_simple_async_result_get_op_res_gssize (simple);
976   return nread;
977 }
978
979 typedef struct {
980   gssize bytes_read;
981   gssize count;
982   void *buffer;
983 } ReadAsyncData;
984
985 static void 
986 free_read_async_data (gpointer _data)
987 {
988   ReadAsyncData *data = _data;
989   g_slice_free (ReadAsyncData, data);
990 }
991
992 static void
993 large_read_callback (GObject *source_object,
994                      GAsyncResult *result,
995                      gpointer user_data)
996 {
997   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
998   ReadAsyncData *data;
999   GError *error;
1000   gssize nread;
1001
1002   data = g_simple_async_result_get_op_res_gpointer (simple);
1003   
1004   error = NULL;
1005   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1006                                       result, &error);
1007
1008   /* Only report the error if we've not already read some data */
1009   if (nread < 0 && data->bytes_read == 0)
1010     g_simple_async_result_set_from_error (simple, error);
1011   
1012   if (nread > 0)
1013     data->bytes_read += nread;
1014   
1015   if (error)
1016     g_error_free (error);
1017   
1018   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1019   g_simple_async_result_complete (simple);
1020   g_object_unref (simple);
1021 }
1022
1023 static void
1024 read_fill_buffer_callback (GObject *source_object,
1025                            GAsyncResult *result,
1026                            gpointer user_data)
1027 {
1028   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1029   GBufferedInputStream *bstream;
1030   GBufferedInputStreamPrivate *priv;
1031   ReadAsyncData *data;
1032   GError *error;
1033   gssize nread;
1034   gsize available;
1035
1036   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1037   priv = bstream->priv;
1038   
1039   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1040   
1041   data = g_simple_async_result_get_op_res_gpointer (simple);
1042   
1043   error = NULL;
1044   nread = g_buffered_input_stream_fill_finish (bstream,
1045                                                result, &error);
1046   
1047   if (nread < 0 && data->bytes_read == 0)
1048     g_simple_async_result_set_from_error (simple, error);
1049
1050
1051   if (nread > 0)
1052     {
1053       available = priv->end - priv->pos;
1054       data->count = MIN (data->count, available);
1055       
1056       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1057       data->bytes_read += data->count;
1058       priv->pos += data->count;
1059     }
1060
1061   if (error)
1062     g_error_free (error);
1063   
1064   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1065   g_simple_async_result_complete (simple);
1066   g_object_unref (simple);
1067 }
1068
1069 static void
1070 g_buffered_input_stream_read_async (GInputStream              *stream,
1071                                     void                      *buffer,
1072                                     gsize                      count,
1073                                     int                        io_priority,
1074                                     GCancellable              *cancellable,
1075                                     GAsyncReadyCallback        callback,
1076                                     gpointer                   user_data)
1077 {
1078   GBufferedInputStream *bstream;
1079   GBufferedInputStreamPrivate *priv;
1080   GInputStream *base_stream;
1081   gsize available;
1082   GSimpleAsyncResult *simple;
1083   ReadAsyncData *data;
1084
1085   bstream = G_BUFFERED_INPUT_STREAM (stream);
1086   priv = bstream->priv;
1087
1088   data = g_slice_new (ReadAsyncData);
1089   data->buffer = buffer;
1090   data->bytes_read = 0;
1091   simple = g_simple_async_result_new (G_OBJECT (stream),
1092                                       callback, user_data,
1093                                       g_buffered_input_stream_read_async);
1094   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1095   
1096   available = priv->end - priv->pos;
1097   
1098   if (count <= available)
1099     {
1100       memcpy (buffer, priv->buffer + priv->pos, count);
1101       priv->pos += count;
1102       data->bytes_read = count;
1103       
1104       g_simple_async_result_complete_in_idle (simple);
1105       g_object_unref (simple);
1106       return;
1107     }
1108
1109
1110   /* Full request not available, read all currently availbile and request refill for more */
1111   
1112   memcpy (buffer, priv->buffer + priv->pos, available);
1113   priv->pos = 0;
1114   priv->end = 0;
1115   
1116   count -= available;
1117   
1118   data->bytes_read = available;
1119   data->count = count;
1120
1121   if (count > priv->len)
1122     {
1123       /* Large request, shortcut buffer */
1124       
1125       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1126       
1127       g_input_stream_read_async (base_stream,
1128                                  (char *)buffer + data->bytes_read,
1129                                  count,
1130                                  io_priority, cancellable,
1131                                  large_read_callback,
1132                                  simple);
1133     }
1134   else
1135     {
1136       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1137       g_buffered_input_stream_fill_async (bstream, priv->len,
1138                                           io_priority, cancellable,
1139                                           read_fill_buffer_callback, simple);
1140     }
1141 }
1142
1143 static gssize
1144 g_buffered_input_stream_read_finish (GInputStream   *stream,
1145                                      GAsyncResult   *result,
1146                                      GError        **error)
1147 {
1148   GSimpleAsyncResult *simple;
1149   ReadAsyncData *data;
1150   
1151   simple = G_SIMPLE_ASYNC_RESULT (result);
1152   
1153   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1154
1155   data = g_simple_async_result_get_op_res_gpointer (simple);
1156   
1157   return data->bytes_read;
1158 }
1159
1160 typedef struct {
1161   gssize bytes_skipped;
1162   gssize count;
1163 } SkipAsyncData;
1164
1165 static void 
1166 free_skip_async_data (gpointer _data)
1167 {
1168   SkipAsyncData *data = _data;
1169   g_slice_free (SkipAsyncData, data);
1170 }
1171
1172 static void
1173 large_skip_callback (GObject *source_object,
1174                      GAsyncResult *result,
1175                      gpointer user_data)
1176 {
1177   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1178   SkipAsyncData *data;
1179   GError *error;
1180   gssize nread;
1181
1182   data = g_simple_async_result_get_op_res_gpointer (simple);
1183   
1184   error = NULL;
1185   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1186                                       result, &error);
1187
1188   /* Only report the error if we've not already read some data */
1189   if (nread < 0 && data->bytes_skipped == 0)
1190     g_simple_async_result_set_from_error (simple, error);
1191   
1192   if (nread > 0)
1193     data->bytes_skipped += nread;
1194   
1195   if (error)
1196     g_error_free (error);
1197   
1198   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1199   g_simple_async_result_complete (simple);
1200   g_object_unref (simple);
1201 }
1202
1203 static void
1204 skip_fill_buffer_callback (GObject *source_object,
1205                            GAsyncResult *result,
1206                            gpointer user_data)
1207 {
1208   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1209   GBufferedInputStream *bstream;
1210   GBufferedInputStreamPrivate *priv;
1211   SkipAsyncData *data;
1212   GError *error;
1213   gssize nread;
1214   gsize available;
1215
1216   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1217   priv = bstream->priv;
1218   
1219   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1220   
1221   data = g_simple_async_result_get_op_res_gpointer (simple);
1222   
1223   error = NULL;
1224   nread = g_buffered_input_stream_fill_finish (bstream,
1225                                                result, &error);
1226   
1227   if (nread < 0 && data->bytes_skipped == 0)
1228     g_simple_async_result_set_from_error (simple, error);
1229
1230
1231   if (nread > 0)
1232     {
1233       available = priv->end - priv->pos;
1234       data->count = MIN (data->count, available);
1235       
1236       data->bytes_skipped += data->count;
1237       priv->pos += data->count;
1238     }
1239
1240   if (error)
1241     g_error_free (error);
1242   
1243   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1244   g_simple_async_result_complete (simple);
1245   g_object_unref (simple);
1246 }
1247
1248 static void
1249 g_buffered_input_stream_skip_async (GInputStream              *stream,
1250                                     gsize                      count,
1251                                     int                        io_priority,
1252                                     GCancellable              *cancellable,
1253                                     GAsyncReadyCallback        callback,
1254                                     gpointer                   user_data)
1255 {
1256   GBufferedInputStream *bstream;
1257   GBufferedInputStreamPrivate *priv;
1258   GInputStream *base_stream;
1259   gsize available;
1260   GSimpleAsyncResult *simple;
1261   SkipAsyncData *data;
1262
1263   bstream = G_BUFFERED_INPUT_STREAM (stream);
1264   priv = bstream->priv;
1265
1266   data = g_slice_new (SkipAsyncData);
1267   data->bytes_skipped = 0;
1268   simple = g_simple_async_result_new (G_OBJECT (stream),
1269                                       callback, user_data,
1270                                       g_buffered_input_stream_skip_async);
1271   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1272   
1273   available = priv->end - priv->pos;
1274   
1275   if (count <= available)
1276     {
1277       priv->pos += count;
1278       data->bytes_skipped = count;
1279       
1280       g_simple_async_result_complete_in_idle (simple);
1281       g_object_unref (simple);
1282       return;
1283     }
1284
1285
1286   /* Full request not available, skip all currently availbile and request refill for more */
1287   
1288   priv->pos = 0;
1289   priv->end = 0;
1290   
1291   count -= available;
1292   
1293   data->bytes_skipped = available;
1294   data->count = count;
1295
1296   if (count > priv->len)
1297     {
1298       /* Large request, shortcut buffer */
1299       
1300       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1301       
1302       g_input_stream_skip_async (base_stream,
1303                                  count,
1304                                  io_priority, cancellable,
1305                                  large_skip_callback,
1306                                  simple);
1307     }
1308   else
1309     {
1310       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1311       g_buffered_input_stream_fill_async (bstream, priv->len,
1312                                           io_priority, cancellable,
1313                                           skip_fill_buffer_callback, simple);
1314     }
1315 }
1316
1317 static gssize
1318 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1319                                      GAsyncResult   *result,
1320                                      GError        **error)
1321 {
1322   GSimpleAsyncResult *simple;
1323   SkipAsyncData *data;
1324   
1325   simple = G_SIMPLE_ASYNC_RESULT (result);
1326   
1327   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1328
1329   data = g_simple_async_result_get_op_res_gpointer (simple);
1330   
1331   return data->bytes_skipped;
1332 }
1333
1334 /* vim: ts=2 sw=2 et */