Added. Added. Added. Added.
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org> 
22  */
23
24 #include <config.h>
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gsimpleasyncresult.h"
28 #include <string.h>
29 #include "glibintl.h"
30
31 #include "gioalias.h"
32
33 /**
34  * SECTION:gbufferedinputstream
35  * @short_description: Buffered Input Stream
36  * @see_also: #GFilterInputStream, #GInputStream
37  * 
38  * Buffered input stream implements #GFilterInputStream and provides 
39  * for buffered reads. 
40  * 
41  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
42  * 
43  * To create a buffered input stream, use g_buffered_input_stream_new(), or 
44  * g_buffered_input_stream_new_sized() to specify the buffer's size at construction.
45  * 
46  * To get the size of a buffer within a buffered input stream, use 
47  * g_buffered_input_stream_get_buffer_size(). To change the size of a 
48  * buffered input stream's buffer, use g_buffered_input_stream_set_buffer_size(). 
49  * Note: the buffer's size cannot be reduced below the size of the data within the
50  * buffer.
51  *
52  **/
53
54
55
56 #define DEFAULT_BUFFER_SIZE 4096
57
58 struct _GBufferedInputStreamPrivate {
59   guint8 *buffer;
60   gsize   len;
61   gsize   pos;
62   gsize   end;
63   GAsyncReadyCallback outstanding_callback;
64 };
65
66 enum {
67   PROP_0,
68   PROP_BUFSIZE
69 };
70
71 static void g_buffered_input_stream_set_property  (GObject      *object,
72                                                    guint         prop_id,
73                                                    const GValue *value,
74                                                    GParamSpec   *pspec);
75
76 static void g_buffered_input_stream_get_property  (GObject      *object,
77                                                    guint         prop_id,
78                                                    GValue       *value,
79                                                    GParamSpec   *pspec);
80 static void g_buffered_input_stream_finalize      (GObject *object);
81
82
83 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
84                                                         gsize                  count,
85                                                         GCancellable          *cancellable,
86                                                         GError               **error);
87 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
88                                                         gsize                  count,
89                                                         int                    io_priority,
90                                                         GCancellable          *cancellable,
91                                                         GAsyncReadyCallback    callback,
92                                                         gpointer               user_data);
93 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
94                                                         GAsyncResult          *result,
95                                                         GError               **error);
96 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
97                                                         void                  *buffer,
98                                                         gsize                  count,
99                                                         GCancellable          *cancellable,
100                                                         GError               **error);
101 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
102                                                         void                  *buffer,
103                                                         gsize                  count,
104                                                         int                    io_priority,
105                                                         GCancellable          *cancellable,
106                                                         GAsyncReadyCallback    callback,
107                                                         gpointer               user_data);
108 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
109                                                         GAsyncResult          *result,
110                                                         GError               **error);
111 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
112                                                         gssize                 count,
113                                                         GCancellable          *cancellable,
114                                                         GError               **error);
115 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
116                                                         gssize                 count,
117                                                         int                    io_priority,
118                                                         GCancellable          *cancellable,
119                                                         GAsyncReadyCallback    callback,
120                                                         gpointer               user_data);
121 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
122                                                         GAsyncResult          *result,
123                                                         GError               **error);
124
125 static void compact_buffer (GBufferedInputStream *stream);
126
127 G_DEFINE_TYPE (GBufferedInputStream,
128                g_buffered_input_stream,
129                G_TYPE_FILTER_INPUT_STREAM)
130
131
132 static void
133 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
134 {
135   GObjectClass *object_class;
136   GInputStreamClass *istream_class;
137   GBufferedInputStreamClass *bstream_class;
138
139   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
140
141   object_class = G_OBJECT_CLASS (klass);
142   object_class->get_property = g_buffered_input_stream_get_property;
143   object_class->set_property = g_buffered_input_stream_set_property;
144   object_class->finalize     = g_buffered_input_stream_finalize;
145
146   istream_class = G_INPUT_STREAM_CLASS (klass);
147   istream_class->skip = g_buffered_input_stream_skip;
148   istream_class->skip_async  = g_buffered_input_stream_skip_async;
149   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
150   istream_class->read = g_buffered_input_stream_read;
151   istream_class->read_async  = g_buffered_input_stream_read_async;
152   istream_class->read_finish = g_buffered_input_stream_read_finish;
153
154   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
155   bstream_class->fill = g_buffered_input_stream_real_fill;
156   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
157   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
158   
159   g_object_class_install_property (object_class,
160                                    PROP_BUFSIZE,
161                                    g_param_spec_uint ("buffer-size",
162                                                       P_("Buffer Size"),
163                                                       P_("The size of the backend buffer"),
164                                                       1,
165                                                       G_MAXUINT,
166                                                       DEFAULT_BUFFER_SIZE,
167                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
168                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
169
170
171 }
172
173 /**
174  * g_buffered_input_stream_get_buffer_size:
175  * @stream: #GBufferedInputStream.
176  * 
177  * Gets the size of the input buffer.
178  * 
179  * Returns: the current buffer size, or %-1 on error.
180  **/
181 gsize
182 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
183 {
184   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
185
186   return stream->priv->len;
187 }
188
189 /**
190  * g_buffered_input_stream_set_buffer_size:
191  * @stream: #GBufferedInputStream.
192  * @size: a #gsize.
193  *
194  * Sets the size of the internal buffer of @stream to @size, or to the 
195  * size of the contents of the buffer. The buffer can never be resized 
196  * smaller than its current contents.
197  **/
198 void
199 g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
200                                          gsize                  size)
201 {
202   GBufferedInputStreamPrivate *priv;
203   gsize in_buffer;
204   guint8 *buffer;
205   
206   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
207
208   priv = stream->priv;
209
210   if (priv->buffer)
211     {
212       in_buffer = priv->end - priv->pos;
213
214       /* Never resize smaller than current buffer contents */
215       size = MAX (size, in_buffer);
216
217       buffer = g_malloc (size);
218       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
219       priv->len = size;
220       priv->pos = 0;
221       priv->end = in_buffer;
222       g_free (priv->buffer);
223       priv->buffer = buffer;
224     }
225   else
226     {
227       priv->len = size;
228       priv->pos = 0;
229       priv->end = 0;
230       priv->buffer = g_malloc (size);
231     }
232 }
233
234 static void
235 g_buffered_input_stream_set_property (GObject         *object,
236                                       guint            prop_id,
237                                       const GValue    *value,
238                                       GParamSpec      *pspec)
239 {
240   GBufferedInputStreamPrivate *priv;
241   GBufferedInputStream        *bstream;
242
243   bstream = G_BUFFERED_INPUT_STREAM (object);
244   priv = bstream->priv;
245
246   switch (prop_id) 
247     {
248     case PROP_BUFSIZE:
249       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
250       break;
251
252     default:
253       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
254       break;
255     }
256
257 }
258
259 static void
260 g_buffered_input_stream_get_property (GObject    *object,
261                                       guint       prop_id,
262                                       GValue     *value,
263                                       GParamSpec *pspec)
264 {
265   GBufferedInputStreamPrivate *priv;
266   GBufferedInputStream        *bstream;
267
268   bstream = G_BUFFERED_INPUT_STREAM (object);
269   priv = bstream->priv;
270
271   switch (prop_id)
272     { 
273     
274     case PROP_BUFSIZE:
275       g_value_set_uint (value, priv->len);
276       break;
277     default:
278       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
279       break;
280     }
281
282 }
283
284 static void
285 g_buffered_input_stream_finalize (GObject *object)
286 {
287   GBufferedInputStreamPrivate *priv;
288   GBufferedInputStream        *stream;
289
290   stream = G_BUFFERED_INPUT_STREAM (object);
291   priv = stream->priv;
292
293   g_free (priv->buffer);
294
295   if (G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize)
296     (*G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) (object);
297 }
298
299 static void
300 g_buffered_input_stream_init (GBufferedInputStream *stream)
301 {
302   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
303                                               G_TYPE_BUFFERED_INPUT_STREAM,
304                                               GBufferedInputStreamPrivate);
305 }
306
307
308 /**
309  * g_buffered_input_stream_new:
310  * @base_stream: a #GInputStream.
311  * 
312  * Creates a new #GInputStream from the given @base_stream, with 
313  * a buffer set to the default size (4 kilobytes).
314  *
315  * Returns: a #GInputStream for the given @base_stream.
316  **/
317 GInputStream *
318 g_buffered_input_stream_new (GInputStream *base_stream)
319 {
320   GInputStream *stream;
321
322   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
323
324   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
325                          "base-stream", base_stream,
326                          NULL);
327
328   return stream;
329 }
330
331 /**
332  * g_buffered_input_stream_new_sized:
333  * @base_stream: a #GOutputStream.
334  * @size: a #gsize.
335  * 
336  * Creates a new #GBufferedInputStream from the given @base_stream, with 
337  * a buffer set to @size.
338  *
339  * Returns: a #GInputStream.
340  **/
341 GInputStream *
342 g_buffered_input_stream_new_sized (GInputStream *base_stream,
343                                    gsize         size)
344 {
345   GInputStream *stream;
346
347   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
348
349   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
350                          "base-stream", base_stream,
351                          "buffer-size", (guint)size,
352                          NULL);
353
354   return stream;
355 }
356
357 /**
358  * g_buffered_input_stream_fill:
359  * @stream: #GBufferedInputStream.
360  * @count: the number of bytes that will be read from the stream.
361  * @cancellable: optional #GCancellable object, %NULL to ignore.
362  * @error: location to store the error occuring, or %NULL to ignore.
363  *
364  * Tries to read @count bytes from the stream into the buffer. 
365  * Will block during this read.
366  * 
367  * If @count is zero, returns zero and does nothing. A value of @count
368  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
369  *
370  * On success, the number of bytes read into the buffer is returned.
371  * It is not an error if this is not the same as the requested size, as it
372  * can happen e.g. near the end of a file. Zero is returned on end of file
373  * (or if @count is zero),  but never otherwise.
374  *
375  * If @cancellable is not %NULL, then the operation can be cancelled by
376  * triggering the cancellable object from another thread. If the operation
377  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
378  * operation was partially finished when the operation was cancelled the
379  * partial result will be returned, without an error.
380  *
381  * On error -1 is returned and @error is set accordingly.
382  * 
383  * For the asynchronous, non-blocking, version of this function, see 
384  * g_buffered_input_stream_fill_async().
385  *
386  * Returns: the number of bytes read into @stream's buffer, up to @count, 
387  * or -1 on error.
388  **/
389 gssize
390 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
391                               gssize                 count,
392                               GCancellable          *cancellable,
393                               GError               **error)
394 {
395   GBufferedInputStreamClass *class;
396   GInputStream *input_stream;
397   gssize res;
398
399   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
400   
401   input_stream = G_INPUT_STREAM (stream);
402   
403   if (g_input_stream_is_closed (input_stream))
404     {
405       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
406                    _("Stream is already closed"));
407       return -1;
408     }
409   
410   if (g_input_stream_has_pending (input_stream))
411     {
412       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
413                    _("Stream has outstanding operation"));
414       return -1;
415     }
416       
417   g_input_stream_set_pending (input_stream, TRUE);
418
419   if (cancellable)
420     g_push_current_cancellable (cancellable);
421   
422   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
423   res = class->fill (stream, count, cancellable, error);
424
425   if (cancellable)
426     g_pop_current_cancellable (cancellable);
427   
428   g_input_stream_set_pending (input_stream, FALSE);
429   
430   return res;
431 }
432
433 static void
434 async_fill_callback_wrapper (GObject *source_object,
435                              GAsyncResult *res,
436                              gpointer      user_data)
437 {
438   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
439
440   g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE);
441   (*stream->priv->outstanding_callback) (source_object, res, user_data);
442   g_object_unref (stream);
443 }
444
445 /**
446  * g_buffered_input_stream_fill_async:
447  * @stream: #GBufferedInputStream.
448  * @count: a #gssize.
449  * @io_priority: the io priority of the request. the io priority of the request.
450  * @cancellable: optional #GCancellable object
451  * @callback: a #GAsyncReadyCallback.
452  * @user_data: a #gpointer.
453  *
454  * Reads data into @stream's buffer asynchronously, up to @count size.
455  * @io_priority can be used to prioritize reads. For the synchronous
456  * version of this function, see g_buffered_input_stream_fill().
457  * 
458  **/
459
460 void
461 g_buffered_input_stream_fill_async (GBufferedInputStream  *stream,
462                                     gssize                 count,
463                                     int                    io_priority,
464                                     GCancellable          *cancellable,
465                                     GAsyncReadyCallback    callback,
466                                     gpointer               user_data)
467 {
468   GBufferedInputStreamClass *class;
469   GSimpleAsyncResult *simple;
470
471   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
472
473   if (count == 0)
474     {
475       simple = g_simple_async_result_new (G_OBJECT (stream),
476                                           callback,
477                                           user_data,
478                                           g_buffered_input_stream_fill_async);
479       g_simple_async_result_complete_in_idle (simple);
480       g_object_unref (simple);
481       return;
482     }
483   
484   if (((gssize) count) < 0)
485     {
486       g_simple_async_report_error_in_idle (G_OBJECT (stream),
487                                            callback,
488                                            user_data,
489                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
490                                            _("Too large count value passed to g_input_stream_read_async"));
491       return;
492     }
493   
494   if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
495     {
496       g_simple_async_report_error_in_idle (G_OBJECT (stream),
497                                            callback,
498                                            user_data,
499                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
500                                            _("Stream is already closed"));
501       return;
502     }
503     
504   if (g_input_stream_has_pending (G_INPUT_STREAM (stream)))
505     {
506       g_simple_async_report_error_in_idle (G_OBJECT (stream),
507                                            callback,
508                                            user_data,
509                                            G_IO_ERROR, G_IO_ERROR_PENDING,
510                                            _("Stream has outstanding operation"));
511       return;
512     }
513
514   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
515   
516   g_input_stream_set_pending (G_INPUT_STREAM (stream), TRUE);
517   stream->priv->outstanding_callback = callback;
518   g_object_ref (stream);
519   class->fill_async (stream, count, io_priority, cancellable,
520                      async_fill_callback_wrapper, user_data);
521 }
522
523 /**
524  * g_buffered_input_stream_fill_finish:
525  * @stream: a #GBufferedInputStream.
526  * @result: a #GAsyncResult.
527  * @error: a #GError.
528  *
529  * Finishes an asynchronous read.
530  * 
531  * Returns: a #gssize of the read stream, or %-1 on an error. 
532  **/
533 gssize
534 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
535                                      GAsyncResult          *result,
536                                      GError               **error)
537 {
538   GSimpleAsyncResult *simple;
539   GBufferedInputStreamClass *class;
540
541   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
542   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
543
544   if (G_IS_SIMPLE_ASYNC_RESULT (result))
545     {
546       simple = G_SIMPLE_ASYNC_RESULT (result);
547       if (g_simple_async_result_propagate_error (simple, error))
548         return -1;
549
550       /* Special case read of 0 bytes */
551       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
552         return 0;
553     }
554
555   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
556   return class->fill_finish (stream, result, error);
557 }
558
559 /**
560  * g_buffered_input_stream_get_available:
561  * @stream: #GBufferedInputStream.
562  * 
563  * Gets the size of the available data within the stream.
564  * 
565  * Returns: size of the available stream. 
566  **/
567 gsize
568 g_buffered_input_stream_get_available (GBufferedInputStream  *stream)
569 {
570   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
571
572   return stream->priv->end - stream->priv->pos;
573 }
574
575 /**
576  * g_buffered_input_stream_peek:
577  * @stream: a #GBufferedInputStream.
578  * @buffer: a pointer to an allocated chunk of memory.
579  * @offset: a #gsize.
580  * @count: a #gsize.
581  * 
582  * Peeks in the buffer, copying data of size @count into @buffer, offset
583  * @offset bytes.
584  * 
585  * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
586  **/
587 gsize
588 g_buffered_input_stream_peek (GBufferedInputStream  *stream,
589                               void                  *buffer,
590                               gsize                  offset,
591                               gsize                  count)
592 {
593   gsize available;
594   gsize end;
595   
596   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
597   g_return_val_if_fail (buffer != NULL, -1);
598
599   available = g_buffered_input_stream_get_available (stream);
600
601   if (offset > available)
602     return 0;
603   
604   end = MIN (offset + count, available);
605   count = end - offset;
606   
607   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
608   return count;
609 }
610
611 /**
612  * g_buffered_input_stream_peek_buffer:
613  * @stream: a #GBufferedInputStream.
614  * @count: a #gsize to get the number of bytes available in the buffer.
615  *
616  * Returns the buffer with the currently available bytes. The returned
617  * buffer must not be modified and will become invalid when reading from
618  * the stream or filling the buffer.
619  *
620  * Returns: read-only buffer
621  **/
622 const void*
623 g_buffered_input_stream_peek_buffer (GBufferedInputStream  *stream,
624                                      gsize                 *count)
625 {
626   GBufferedInputStreamPrivate *priv;
627
628   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
629
630   priv = stream->priv;
631
632   if (count) {
633     *count = priv->end - priv->pos;
634   }
635
636   return priv->buffer + priv->pos;
637 }
638
639 static void
640 compact_buffer (GBufferedInputStream *stream)
641 {
642   GBufferedInputStreamPrivate *priv;
643   gsize current_size;
644
645   priv = stream->priv;
646
647   current_size = priv->end - priv->pos;
648   
649   g_memmove (priv->buffer,
650              priv->buffer + priv->pos,
651              current_size);
652   
653   priv->pos = 0;
654   priv->end = current_size;
655 }
656
657 static gssize
658 g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
659                                    gssize                 count,
660                                    GCancellable         *cancellable,
661                                    GError              **error)
662 {
663   GBufferedInputStreamPrivate *priv;
664   GInputStream *base_stream;
665   gssize nread;
666   gsize in_buffer;
667
668   priv = stream->priv;
669
670   if (count == -1)
671     count = priv->len;
672   
673   in_buffer = priv->end - priv->pos;
674
675   /* Never fill more than can fit in the buffer */
676   count = MIN (count, priv->len - in_buffer);
677
678   /* If requested length does not fit at end, compact */
679   if (priv->len - priv->end < count)
680     compact_buffer (stream);
681
682   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
683   nread = g_input_stream_read (base_stream,
684                                priv->buffer + priv->end,
685                                count,
686                                cancellable,
687                                error);
688
689   if (nread > 0)
690     priv->end += nread;
691   
692   return nread;
693 }
694
695 static gssize
696 g_buffered_input_stream_skip (GInputStream          *stream,
697                               gsize                  count,
698                               GCancellable          *cancellable,
699                               GError               **error)
700 {
701   GBufferedInputStream        *bstream;
702   GBufferedInputStreamPrivate *priv;
703   GInputStream *base_stream;
704   gsize available, bytes_skipped;
705   gssize nread;
706
707   bstream = G_BUFFERED_INPUT_STREAM (stream);
708   priv = bstream->priv;
709
710   available = priv->end - priv->pos;
711
712   if (count <= available)
713     {
714       priv->pos += count;
715       return count;
716     }
717
718   /* Full request not available, skip all currently availbile and request refill for more */
719   
720   priv->pos = 0;
721   priv->end = 0;
722   bytes_skipped = available;
723   count -= available;
724
725   if (bytes_skipped > 0)
726     error = NULL; /* Ignore further errors if we already read some data */
727   
728   if (count > priv->len)
729     {
730       /* Large request, shortcut buffer */
731       
732       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
733
734       nread = g_input_stream_skip (base_stream,
735                                    count,
736                                    cancellable,
737                                    error);
738       
739       if (nread < 0 && bytes_skipped == 0)
740         return -1;
741       
742       if (nread > 0)
743         bytes_skipped += nread;
744       
745       return bytes_skipped;
746     }
747   
748   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
749   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
750   g_input_stream_set_pending (stream, TRUE); /* enable again */
751   
752   if (nread < 0)
753     {
754       if (bytes_skipped == 0)
755         return -1;
756       else
757         return bytes_skipped;
758     }
759   
760   available = priv->end - priv->pos;
761   count = MIN (count, available);
762   
763   bytes_skipped += count;
764   priv->pos += count;
765   
766   return bytes_skipped;
767 }
768
769 static gssize
770 g_buffered_input_stream_read (GInputStream *stream,
771                               void         *buffer,
772                               gsize         count,
773                               GCancellable *cancellable,
774                               GError      **error)
775 {
776   GBufferedInputStream        *bstream;
777   GBufferedInputStreamPrivate *priv;
778   GInputStream *base_stream;
779   gsize available, bytes_read;
780   gssize nread;
781
782   bstream = G_BUFFERED_INPUT_STREAM (stream);
783   priv = bstream->priv;
784
785   available = priv->end - priv->pos;
786
787   if (count <= available)
788     {
789       memcpy (buffer, priv->buffer + priv->pos, count);
790       priv->pos += count;
791       return count;
792     }
793   
794   /* Full request not available, read all currently availbile and request refill for more */
795   
796   memcpy (buffer, priv->buffer + priv->pos, available);
797   priv->pos = 0;
798   priv->end = 0;
799   bytes_read = available;
800   count -= available;
801
802   if (bytes_read > 0)
803     error = NULL; /* Ignore further errors if we already read some data */
804   
805   if (count > priv->len)
806     {
807       /* Large request, shortcut buffer */
808       
809       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
810
811       nread = g_input_stream_read (base_stream,
812                                    (char *)buffer + bytes_read,
813                                    count,
814                                    cancellable,
815                                    error);
816       
817       if (nread < 0 && bytes_read == 0)
818         return -1;
819       
820       if (nread > 0)
821         bytes_read += nread;
822       
823       return bytes_read;
824     }
825   
826   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
827   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
828   g_input_stream_set_pending (stream, TRUE); /* enable again */
829   if (nread < 0)
830     {
831       if (bytes_read == 0)
832         return -1;
833       else
834         return bytes_read;
835     }
836   
837   available = priv->end - priv->pos;
838   count = MIN (count, available);
839   
840   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
841   bytes_read += count;
842   priv->pos += count;
843   
844   return bytes_read;
845 }
846
847 /**
848  * g_buffered_input_stream_read_byte:
849  * @stream: #GBufferedInputStream.
850  * @cancellable: optional #GCancellable object, %NULL to ignore.
851  * @error: location to store the error occuring, or %NULL to ignore.
852  *
853  * Tries to read a single byte from the stream or the buffer. Will block
854  * during this read.
855  *
856  * On success, the byte read from the stream is returned. On end of stream
857  * -1 is returned but it's not an exceptional error and @error is not set.
858  * 
859  * If @cancellable is not %NULL, then the operation can be cancelled by
860  * triggering the cancellable object from another thread. If the operation
861  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
862  * operation was partially finished when the operation was cancelled the
863  * partial result will be returned, without an error.
864  *
865  * On error -1 is returned and @error is set accordingly.
866  * 
867  * Returns: the byte read from the @stream, or -1 on end of stream or error.
868  **/
869 int
870 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
871                                    GCancellable          *cancellable,
872                                    GError               **error)
873 {
874   GBufferedInputStreamPrivate *priv;
875   GInputStream *input_stream;
876   gsize available;
877   gssize nread;
878
879   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
880
881   priv = stream->priv;
882   input_stream = G_INPUT_STREAM (stream);
883
884   if (g_input_stream_is_closed (input_stream))
885     {
886       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
887        _("Stream is already closed"));
888       return -1;
889     }
890
891   if (g_input_stream_has_pending (input_stream))
892     {
893       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
894        _("Stream has outstanding operation"));
895       return -1;
896     }
897
898   available = priv->end - priv->pos;
899
900   if (available < 1)
901     return priv->buffer[priv->pos++];
902
903   /* Byte not available, request refill for more */
904
905   if (cancellable)
906     g_push_current_cancellable (cancellable);
907
908   priv->pos = 0;
909   priv->end = 0;
910
911   nread = g_buffered_input_stream_fill (stream, priv->len, cancellable, error);
912
913   if (cancellable)
914     g_pop_current_cancellable (cancellable);
915
916   if (nread <= 0)
917     return -1; /* error or end of stream */
918
919   return priv->buffer[priv->pos++];
920 }
921
922 /* ************************** */
923 /* Async stuff implementation */
924 /* ************************** */
925
926 static void
927 fill_async_callback (GObject *source_object,
928                      GAsyncResult *result,
929                      gpointer user_data)
930 {
931   GError *error;
932   gssize res;
933   GSimpleAsyncResult *simple;
934
935   simple = user_data;
936   
937   error = NULL;
938   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
939                                     result, &error);
940
941   g_simple_async_result_set_op_res_gssize (simple, res);
942   if (res == -1)
943     {
944       g_simple_async_result_set_from_error (simple, error);
945       g_error_free (error);
946     }
947   
948   /* Complete immediately, not in idle, since we're already in a mainloop callout */
949   g_simple_async_result_complete (simple);
950   g_object_unref (simple);
951 }
952
953 static void
954 g_buffered_input_stream_real_fill_async  (GBufferedInputStream *stream,
955                                           gssize                count,
956                                           int                   io_priority,
957                                           GCancellable         *cancellable,
958                                           GAsyncReadyCallback   callback,
959                                           gpointer              user_data)
960 {
961   GBufferedInputStreamPrivate *priv;
962   GInputStream *base_stream;
963   GSimpleAsyncResult *simple;
964   gsize in_buffer;
965
966   priv = stream->priv;
967
968   if (count == -1)
969     count = priv->len;
970   
971   in_buffer = priv->end - priv->pos;
972
973   /* Never fill more than can fit in the buffer */
974   count = MIN (count, priv->len - in_buffer);
975
976   /* If requested length does not fit at end, compact */
977   if (priv->len - priv->end < count)
978     compact_buffer (stream);
979
980   simple = g_simple_async_result_new (G_OBJECT (stream),
981                                       callback, user_data,
982                                       g_buffered_input_stream_real_fill_async);
983   
984   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
985   g_input_stream_read_async (base_stream,
986                              priv->buffer + priv->end,
987                              count,
988                              io_priority,
989                              cancellable,
990                              fill_async_callback,
991                              simple);
992 }
993
994 static gssize
995 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
996                                           GAsyncResult         *result,
997                                           GError              **error)
998 {
999   GSimpleAsyncResult *simple;
1000   gssize nread;
1001
1002   simple = G_SIMPLE_ASYNC_RESULT (result);
1003   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1004   
1005   nread = g_simple_async_result_get_op_res_gssize (simple);
1006   return nread;
1007 }
1008
1009 typedef struct {
1010   gssize bytes_read;
1011   gssize count;
1012   void *buffer;
1013 } ReadAsyncData;
1014
1015 static void 
1016 free_read_async_data (gpointer _data)
1017 {
1018   ReadAsyncData *data = _data;
1019   g_slice_free (ReadAsyncData, data);
1020 }
1021
1022 static void
1023 large_read_callback (GObject *source_object,
1024                      GAsyncResult *result,
1025                      gpointer user_data)
1026 {
1027   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1028   ReadAsyncData *data;
1029   GError *error;
1030   gssize nread;
1031
1032   data = g_simple_async_result_get_op_res_gpointer (simple);
1033   
1034   error = NULL;
1035   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1036                                       result, &error);
1037
1038   /* Only report the error if we've not already read some data */
1039   if (nread < 0 && data->bytes_read == 0)
1040     g_simple_async_result_set_from_error (simple, error);
1041   
1042   if (nread > 0)
1043     data->bytes_read += nread;
1044   
1045   if (error)
1046     g_error_free (error);
1047   
1048   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1049   g_simple_async_result_complete (simple);
1050   g_object_unref (simple);
1051 }
1052
1053 static void
1054 read_fill_buffer_callback (GObject *source_object,
1055                            GAsyncResult *result,
1056                            gpointer user_data)
1057 {
1058   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1059   GBufferedInputStream *bstream;
1060   GBufferedInputStreamPrivate *priv;
1061   ReadAsyncData *data;
1062   GError *error;
1063   gssize nread;
1064   gsize available;
1065
1066   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1067   priv = bstream->priv;
1068   
1069   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1070   
1071   data = g_simple_async_result_get_op_res_gpointer (simple);
1072   
1073   error = NULL;
1074   nread = g_buffered_input_stream_fill_finish (bstream,
1075                                                result, &error);
1076   
1077   if (nread < 0 && data->bytes_read == 0)
1078     g_simple_async_result_set_from_error (simple, error);
1079
1080
1081   if (nread > 0)
1082     {
1083       available = priv->end - priv->pos;
1084       data->count = MIN (data->count, available);
1085       
1086       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1087       data->bytes_read += data->count;
1088       priv->pos += data->count;
1089     }
1090
1091   if (error)
1092     g_error_free (error);
1093   
1094   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1095   g_simple_async_result_complete (simple);
1096   g_object_unref (simple);
1097 }
1098
1099 static void
1100 g_buffered_input_stream_read_async (GInputStream              *stream,
1101                                     void                      *buffer,
1102                                     gsize                      count,
1103                                     int                        io_priority,
1104                                     GCancellable              *cancellable,
1105                                     GAsyncReadyCallback        callback,
1106                                     gpointer                   user_data)
1107 {
1108   GBufferedInputStream *bstream;
1109   GBufferedInputStreamPrivate *priv;
1110   GInputStream *base_stream;
1111   gsize available;
1112   GSimpleAsyncResult *simple;
1113   ReadAsyncData *data;
1114
1115   bstream = G_BUFFERED_INPUT_STREAM (stream);
1116   priv = bstream->priv;
1117
1118   data = g_slice_new (ReadAsyncData);
1119   data->buffer = buffer;
1120   data->bytes_read = 0;
1121   simple = g_simple_async_result_new (G_OBJECT (stream),
1122                                       callback, user_data,
1123                                       g_buffered_input_stream_read_async);
1124   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1125   
1126   available = priv->end - priv->pos;
1127   
1128   if (count <= available)
1129     {
1130       memcpy (buffer, priv->buffer + priv->pos, count);
1131       priv->pos += count;
1132       data->bytes_read = count;
1133       
1134       g_simple_async_result_complete_in_idle (simple);
1135       g_object_unref (simple);
1136       return;
1137     }
1138
1139
1140   /* Full request not available, read all currently availbile and request refill for more */
1141   
1142   memcpy (buffer, priv->buffer + priv->pos, available);
1143   priv->pos = 0;
1144   priv->end = 0;
1145   
1146   count -= available;
1147   
1148   data->bytes_read = available;
1149   data->count = count;
1150
1151   if (count > priv->len)
1152     {
1153       /* Large request, shortcut buffer */
1154       
1155       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1156       
1157       g_input_stream_read_async (base_stream,
1158                                  (char *)buffer + data->bytes_read,
1159                                  count,
1160                                  io_priority, cancellable,
1161                                  large_read_callback,
1162                                  simple);
1163     }
1164   else
1165     {
1166       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1167       g_buffered_input_stream_fill_async (bstream, priv->len,
1168                                           io_priority, cancellable,
1169                                           read_fill_buffer_callback, simple);
1170     }
1171 }
1172
1173 static gssize
1174 g_buffered_input_stream_read_finish (GInputStream   *stream,
1175                                      GAsyncResult   *result,
1176                                      GError        **error)
1177 {
1178   GSimpleAsyncResult *simple;
1179   ReadAsyncData *data;
1180   
1181   simple = G_SIMPLE_ASYNC_RESULT (result);
1182   
1183   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1184
1185   data = g_simple_async_result_get_op_res_gpointer (simple);
1186   
1187   return data->bytes_read;
1188 }
1189
1190 typedef struct {
1191   gssize bytes_skipped;
1192   gssize count;
1193 } SkipAsyncData;
1194
1195 static void 
1196 free_skip_async_data (gpointer _data)
1197 {
1198   SkipAsyncData *data = _data;
1199   g_slice_free (SkipAsyncData, data);
1200 }
1201
1202 static void
1203 large_skip_callback (GObject *source_object,
1204                      GAsyncResult *result,
1205                      gpointer user_data)
1206 {
1207   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1208   SkipAsyncData *data;
1209   GError *error;
1210   gssize nread;
1211
1212   data = g_simple_async_result_get_op_res_gpointer (simple);
1213   
1214   error = NULL;
1215   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1216                                       result, &error);
1217
1218   /* Only report the error if we've not already read some data */
1219   if (nread < 0 && data->bytes_skipped == 0)
1220     g_simple_async_result_set_from_error (simple, error);
1221   
1222   if (nread > 0)
1223     data->bytes_skipped += nread;
1224   
1225   if (error)
1226     g_error_free (error);
1227   
1228   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1229   g_simple_async_result_complete (simple);
1230   g_object_unref (simple);
1231 }
1232
1233 static void
1234 skip_fill_buffer_callback (GObject *source_object,
1235                            GAsyncResult *result,
1236                            gpointer user_data)
1237 {
1238   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1239   GBufferedInputStream *bstream;
1240   GBufferedInputStreamPrivate *priv;
1241   SkipAsyncData *data;
1242   GError *error;
1243   gssize nread;
1244   gsize available;
1245
1246   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1247   priv = bstream->priv;
1248   
1249   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1250   
1251   data = g_simple_async_result_get_op_res_gpointer (simple);
1252   
1253   error = NULL;
1254   nread = g_buffered_input_stream_fill_finish (bstream,
1255                                                result, &error);
1256   
1257   if (nread < 0 && data->bytes_skipped == 0)
1258     g_simple_async_result_set_from_error (simple, error);
1259
1260
1261   if (nread > 0)
1262     {
1263       available = priv->end - priv->pos;
1264       data->count = MIN (data->count, available);
1265       
1266       data->bytes_skipped += data->count;
1267       priv->pos += data->count;
1268     }
1269
1270   if (error)
1271     g_error_free (error);
1272   
1273   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1274   g_simple_async_result_complete (simple);
1275   g_object_unref (simple);
1276 }
1277
1278 static void
1279 g_buffered_input_stream_skip_async (GInputStream              *stream,
1280                                     gsize                      count,
1281                                     int                        io_priority,
1282                                     GCancellable              *cancellable,
1283                                     GAsyncReadyCallback        callback,
1284                                     gpointer                   user_data)
1285 {
1286   GBufferedInputStream *bstream;
1287   GBufferedInputStreamPrivate *priv;
1288   GInputStream *base_stream;
1289   gsize available;
1290   GSimpleAsyncResult *simple;
1291   SkipAsyncData *data;
1292
1293   bstream = G_BUFFERED_INPUT_STREAM (stream);
1294   priv = bstream->priv;
1295
1296   data = g_slice_new (SkipAsyncData);
1297   data->bytes_skipped = 0;
1298   simple = g_simple_async_result_new (G_OBJECT (stream),
1299                                       callback, user_data,
1300                                       g_buffered_input_stream_skip_async);
1301   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1302   
1303   available = priv->end - priv->pos;
1304   
1305   if (count <= available)
1306     {
1307       priv->pos += count;
1308       data->bytes_skipped = count;
1309       
1310       g_simple_async_result_complete_in_idle (simple);
1311       g_object_unref (simple);
1312       return;
1313     }
1314
1315
1316   /* Full request not available, skip all currently availbile and request refill for more */
1317   
1318   priv->pos = 0;
1319   priv->end = 0;
1320   
1321   count -= available;
1322   
1323   data->bytes_skipped = available;
1324   data->count = count;
1325
1326   if (count > priv->len)
1327     {
1328       /* Large request, shortcut buffer */
1329       
1330       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1331       
1332       g_input_stream_skip_async (base_stream,
1333                                  count,
1334                                  io_priority, cancellable,
1335                                  large_skip_callback,
1336                                  simple);
1337     }
1338   else
1339     {
1340       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1341       g_buffered_input_stream_fill_async (bstream, priv->len,
1342                                           io_priority, cancellable,
1343                                           skip_fill_buffer_callback, simple);
1344     }
1345 }
1346
1347 static gssize
1348 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1349                                      GAsyncResult   *result,
1350                                      GError        **error)
1351 {
1352   GSimpleAsyncResult *simple;
1353   SkipAsyncData *data;
1354   
1355   simple = G_SIMPLE_ASYNC_RESULT (result);
1356   
1357   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1358
1359   data = g_simple_async_result_get_op_res_gpointer (simple);
1360   
1361   return data->bytes_skipped;
1362 }
1363
1364
1365 #define __G_BUFFERED_INPUT_STREAM_C__
1366 #include "gioaliasdef.c"
1367
1368 /* vim: ts=2 sw=2 et */