Explain I/O priorieties
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org> 
22  */
23
24 #include <config.h>
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gsimpleasyncresult.h"
28 #include <string.h>
29 #include "glibintl.h"
30
31 #include "gioalias.h"
32
33 /**
34  * SECTION:gbufferedinputstream
35  * @short_description: Buffered Input Stream
36  * @see_also: #GFilterInputStream, #GInputStream
37  * 
38  * Buffered input stream implements #GFilterInputStream and provides 
39  * for buffered reads. 
40  * 
41  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
42  * 
43  * To create a buffered input stream, use g_buffered_input_stream_new(), or 
44  * g_buffered_input_stream_new_sized() to specify the buffer's size at construction.
45  * 
46  * To get the size of a buffer within a buffered input stream, use 
47  * g_buffered_input_stream_get_buffer_size(). To change the size of a 
48  * buffered input stream's buffer, use g_buffered_input_stream_set_buffer_size(). 
49  * Note: the buffer's size cannot be reduced below the size of the data within the
50  * buffer.
51  *
52  **/
53
54
55
56 #define DEFAULT_BUFFER_SIZE 4096
57
58 struct _GBufferedInputStreamPrivate {
59   guint8 *buffer;
60   gsize   len;
61   gsize   pos;
62   gsize   end;
63   GAsyncReadyCallback outstanding_callback;
64 };
65
66 enum {
67   PROP_0,
68   PROP_BUFSIZE
69 };
70
71 static void g_buffered_input_stream_set_property  (GObject      *object,
72                                                    guint         prop_id,
73                                                    const GValue *value,
74                                                    GParamSpec   *pspec);
75
76 static void g_buffered_input_stream_get_property  (GObject      *object,
77                                                    guint         prop_id,
78                                                    GValue       *value,
79                                                    GParamSpec   *pspec);
80 static void g_buffered_input_stream_finalize      (GObject *object);
81
82
83 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
84                                                         gsize                  count,
85                                                         GCancellable          *cancellable,
86                                                         GError               **error);
87 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
88                                                         gsize                  count,
89                                                         int                    io_priority,
90                                                         GCancellable          *cancellable,
91                                                         GAsyncReadyCallback    callback,
92                                                         gpointer               user_data);
93 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
94                                                         GAsyncResult          *result,
95                                                         GError               **error);
96 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
97                                                         void                  *buffer,
98                                                         gsize                  count,
99                                                         GCancellable          *cancellable,
100                                                         GError               **error);
101 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
102                                                         void                  *buffer,
103                                                         gsize                  count,
104                                                         int                    io_priority,
105                                                         GCancellable          *cancellable,
106                                                         GAsyncReadyCallback    callback,
107                                                         gpointer               user_data);
108 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
109                                                         GAsyncResult          *result,
110                                                         GError               **error);
111 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
112                                                         gssize                 count,
113                                                         GCancellable          *cancellable,
114                                                         GError               **error);
115 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
116                                                         gssize                 count,
117                                                         int                    io_priority,
118                                                         GCancellable          *cancellable,
119                                                         GAsyncReadyCallback    callback,
120                                                         gpointer               user_data);
121 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
122                                                         GAsyncResult          *result,
123                                                         GError               **error);
124
125 static void compact_buffer (GBufferedInputStream *stream);
126
127 G_DEFINE_TYPE (GBufferedInputStream,
128                g_buffered_input_stream,
129                G_TYPE_FILTER_INPUT_STREAM)
130
131
132 static void
133 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
134 {
135   GObjectClass *object_class;
136   GInputStreamClass *istream_class;
137   GBufferedInputStreamClass *bstream_class;
138
139   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
140
141   object_class = G_OBJECT_CLASS (klass);
142   object_class->get_property = g_buffered_input_stream_get_property;
143   object_class->set_property = g_buffered_input_stream_set_property;
144   object_class->finalize     = g_buffered_input_stream_finalize;
145
146   istream_class = G_INPUT_STREAM_CLASS (klass);
147   istream_class->skip = g_buffered_input_stream_skip;
148   istream_class->skip_async  = g_buffered_input_stream_skip_async;
149   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
150   istream_class->read = g_buffered_input_stream_read;
151   istream_class->read_async  = g_buffered_input_stream_read_async;
152   istream_class->read_finish = g_buffered_input_stream_read_finish;
153
154   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
155   bstream_class->fill = g_buffered_input_stream_real_fill;
156   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
157   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
158   
159   g_object_class_install_property (object_class,
160                                    PROP_BUFSIZE,
161                                    g_param_spec_uint ("buffer-size",
162                                                       P_("Buffer Size"),
163                                                       P_("The size of the backend buffer"),
164                                                       1,
165                                                       G_MAXUINT,
166                                                       DEFAULT_BUFFER_SIZE,
167                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
168                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
169
170
171 }
172
173 /**
174  * g_buffered_input_stream_get_buffer_size:
175  * @stream: #GBufferedInputStream.
176  * 
177  * Gets the size of the input buffer.
178  * 
179  * Returns: the current buffer size, or %-1 on error.
180  **/
181 gsize
182 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
183 {
184   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
185
186   return stream->priv->len;
187 }
188
189 /**
190  * g_buffered_input_stream_set_buffer_size:
191  * @stream: #GBufferedInputStream.
192  * @size: a #gsize.
193  *
194  * Sets the size of the internal buffer of @stream to @size, or to the 
195  * size of the contents of the buffer. The buffer can never be resized 
196  * smaller than its current contents.
197  **/
198 void
199 g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
200                                          gsize                  size)
201 {
202   GBufferedInputStreamPrivate *priv;
203   gsize in_buffer;
204   guint8 *buffer;
205   
206   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
207
208   priv = stream->priv;
209
210   if (priv->buffer)
211     {
212       in_buffer = priv->end - priv->pos;
213
214       /* Never resize smaller than current buffer contents */
215       size = MAX (size, in_buffer);
216
217       buffer = g_malloc (size);
218       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
219       priv->len = size;
220       priv->pos = 0;
221       priv->end = in_buffer;
222       g_free (priv->buffer);
223       priv->buffer = buffer;
224     }
225   else
226     {
227       priv->len = size;
228       priv->pos = 0;
229       priv->end = 0;
230       priv->buffer = g_malloc (size);
231     }
232 }
233
234 static void
235 g_buffered_input_stream_set_property (GObject      *object,
236                                       guint         prop_id,
237                                       const GValue *value,
238                                       GParamSpec   *pspec)
239 {
240   GBufferedInputStreamPrivate *priv;
241   GBufferedInputStream        *bstream;
242
243   bstream = G_BUFFERED_INPUT_STREAM (object);
244   priv = bstream->priv;
245
246   switch (prop_id) 
247     {
248     case PROP_BUFSIZE:
249       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
250       break;
251
252     default:
253       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
254       break;
255     }
256
257 }
258
259 static void
260 g_buffered_input_stream_get_property (GObject    *object,
261                                       guint       prop_id,
262                                       GValue     *value,
263                                       GParamSpec *pspec)
264 {
265   GBufferedInputStreamPrivate *priv;
266   GBufferedInputStream        *bstream;
267
268   bstream = G_BUFFERED_INPUT_STREAM (object);
269   priv = bstream->priv;
270
271   switch (prop_id)
272     { 
273     case PROP_BUFSIZE:
274       g_value_set_uint (value, priv->len);
275       break;
276
277     default:
278       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
279       break;
280     }
281 }
282
283 static void
284 g_buffered_input_stream_finalize (GObject *object)
285 {
286   GBufferedInputStreamPrivate *priv;
287   GBufferedInputStream        *stream;
288
289   stream = G_BUFFERED_INPUT_STREAM (object);
290   priv = stream->priv;
291
292   g_free (priv->buffer);
293
294   if (G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize)
295     (*G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) (object);
296 }
297
298 static void
299 g_buffered_input_stream_init (GBufferedInputStream *stream)
300 {
301   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
302                                               G_TYPE_BUFFERED_INPUT_STREAM,
303                                               GBufferedInputStreamPrivate);
304 }
305
306
307 /**
308  * g_buffered_input_stream_new:
309  * @base_stream: a #GInputStream.
310  * 
311  * Creates a new #GInputStream from the given @base_stream, with 
312  * a buffer set to the default size (4 kilobytes).
313  *
314  * Returns: a #GInputStream for the given @base_stream.
315  **/
316 GInputStream *
317 g_buffered_input_stream_new (GInputStream *base_stream)
318 {
319   GInputStream *stream;
320
321   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
322
323   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
324                          "base-stream", base_stream,
325                          NULL);
326
327   return stream;
328 }
329
330 /**
331  * g_buffered_input_stream_new_sized:
332  * @base_stream: a #GOutputStream.
333  * @size: a #gsize.
334  * 
335  * Creates a new #GBufferedInputStream from the given @base_stream, 
336  * with a buffer set to @size.
337  *
338  * Returns: a #GInputStream.
339  **/
340 GInputStream *
341 g_buffered_input_stream_new_sized (GInputStream *base_stream,
342                                    gsize         size)
343 {
344   GInputStream *stream;
345
346   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
347
348   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
349                          "base-stream", base_stream,
350                          "buffer-size", (guint)size,
351                          NULL);
352
353   return stream;
354 }
355
356 /**
357  * g_buffered_input_stream_fill:
358  * @stream: #GBufferedInputStream.
359  * @count: the number of bytes that will be read from the stream.
360  * @cancellable: optional #GCancellable object, %NULL to ignore.
361  * @error: location to store the error occuring, or %NULL to ignore.
362  *
363  * Tries to read @count bytes from the stream into the buffer. 
364  * Will block during this read.
365  * 
366  * If @count is zero, returns zero and does nothing. A value of @count
367  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
368  *
369  * On success, the number of bytes read into the buffer is returned.
370  * It is not an error if this is not the same as the requested size, as it
371  * can happen e.g. near the end of a file. Zero is returned on end of file
372  * (or if @count is zero),  but never otherwise.
373  *
374  * If @cancellable is not %NULL, then the operation can be cancelled by
375  * triggering the cancellable object from another thread. If the operation
376  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
377  * operation was partially finished when the operation was cancelled the
378  * partial result will be returned, without an error.
379  *
380  * On error -1 is returned and @error is set accordingly.
381  * 
382  * For the asynchronous, non-blocking, version of this function, see 
383  * g_buffered_input_stream_fill_async().
384  *
385  * Returns: the number of bytes read into @stream's buffer, up to @count, 
386  *     or -1 on error.
387  **/
388 gssize
389 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
390                               gssize                 count,
391                               GCancellable          *cancellable,
392                               GError               **error)
393 {
394   GBufferedInputStreamClass *class;
395   GInputStream *input_stream;
396   gssize res;
397
398   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
399   
400   input_stream = G_INPUT_STREAM (stream);
401   
402   if (g_input_stream_is_closed (input_stream))
403     {
404       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
405                    _("Stream is already closed"));
406       return -1;
407     }
408   
409   if (g_input_stream_has_pending (input_stream))
410     {
411       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
412                    _("Stream has outstanding operation"));
413       return -1;
414     }
415       
416   g_input_stream_set_pending (input_stream, TRUE);
417
418   if (cancellable)
419     g_push_current_cancellable (cancellable);
420   
421   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
422   res = class->fill (stream, count, cancellable, error);
423
424   if (cancellable)
425     g_pop_current_cancellable (cancellable);
426   
427   g_input_stream_set_pending (input_stream, FALSE);
428   
429   return res;
430 }
431
432 static void
433 async_fill_callback_wrapper (GObject      *source_object,
434                              GAsyncResult *res,
435                              gpointer      user_data)
436 {
437   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
438
439   g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE);
440   (*stream->priv->outstanding_callback) (source_object, res, user_data);
441   g_object_unref (stream);
442 }
443
444 /**
445  * g_buffered_input_stream_fill_async:
446  * @stream: #GBufferedInputStream.
447  * @count: a #gssize.
448  * @io_priority: the <link linkend="gio-GIOScheduler">I/O priority</link> 
449  *     of the request.
450  * @cancellable: optional #GCancellable object
451  * @callback: a #GAsyncReadyCallback.
452  * @user_data: a #gpointer.
453  *
454  * Reads data into @stream's buffer asynchronously, up to @count size.
455  * @io_priority can be used to prioritize reads. For the synchronous
456  * version of this function, see g_buffered_input_stream_fill().
457  **/
458 void
459 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
460                                     gssize                count,
461                                     int                   io_priority,
462                                     GCancellable         *cancellable,
463                                     GAsyncReadyCallback   callback,
464                                     gpointer              user_data)
465 {
466   GBufferedInputStreamClass *class;
467   GSimpleAsyncResult *simple;
468
469   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
470
471   if (count == 0)
472     {
473       simple = g_simple_async_result_new (G_OBJECT (stream),
474                                           callback,
475                                           user_data,
476                                           g_buffered_input_stream_fill_async);
477       g_simple_async_result_complete_in_idle (simple);
478       g_object_unref (simple);
479       return;
480     }
481   
482   if (((gssize) count) < 0)
483     {
484       g_simple_async_report_error_in_idle (G_OBJECT (stream),
485                                            callback,
486                                            user_data,
487                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
488                                            _("Too large count value passed to g_input_stream_read_async"));
489       return;
490     }
491   
492   if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
493     {
494       g_simple_async_report_error_in_idle (G_OBJECT (stream),
495                                            callback,
496                                            user_data,
497                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
498                                            _("Stream is already closed"));
499       return;
500     }
501     
502   if (g_input_stream_has_pending (G_INPUT_STREAM (stream)))
503     {
504       g_simple_async_report_error_in_idle (G_OBJECT (stream),
505                                            callback,
506                                            user_data,
507                                            G_IO_ERROR, G_IO_ERROR_PENDING,
508                                            _("Stream has outstanding operation"));
509       return;
510     }
511
512   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
513   
514   g_input_stream_set_pending (G_INPUT_STREAM (stream), TRUE);
515   stream->priv->outstanding_callback = callback;
516   g_object_ref (stream);
517   class->fill_async (stream, count, io_priority, cancellable,
518                      async_fill_callback_wrapper, user_data);
519 }
520
521 /**
522  * g_buffered_input_stream_fill_finish:
523  * @stream: a #GBufferedInputStream.
524  * @result: a #GAsyncResult.
525  * @error: a #GError.
526  *
527  * Finishes an asynchronous read.
528  * 
529  * Returns: a #gssize of the read stream, or %-1 on an error. 
530  **/
531 gssize
532 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
533                                      GAsyncResult          *result,
534                                      GError               **error)
535 {
536   GSimpleAsyncResult *simple;
537   GBufferedInputStreamClass *class;
538
539   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
540   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
541
542   if (G_IS_SIMPLE_ASYNC_RESULT (result))
543     {
544       simple = G_SIMPLE_ASYNC_RESULT (result);
545       if (g_simple_async_result_propagate_error (simple, error))
546         return -1;
547
548       /* Special case read of 0 bytes */
549       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
550         return 0;
551     }
552
553   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
554   return class->fill_finish (stream, result, error);
555 }
556
557 /**
558  * g_buffered_input_stream_get_available:
559  * @stream: #GBufferedInputStream.
560  * 
561  * Gets the size of the available data within the stream.
562  * 
563  * Returns: size of the available stream. 
564  **/
565 gsize
566 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
567 {
568   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
569
570   return stream->priv->end - stream->priv->pos;
571 }
572
573 /**
574  * g_buffered_input_stream_peek:
575  * @stream: a #GBufferedInputStream.
576  * @buffer: a pointer to an allocated chunk of memory.
577  * @offset: a #gsize.
578  * @count: a #gsize.
579  * 
580  * Peeks in the buffer, copying data of size @count into @buffer, 
581  * offset @offset bytes.
582  * 
583  * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
584  **/
585 gsize
586 g_buffered_input_stream_peek (GBufferedInputStream *stream,
587                               void                 *buffer,
588                               gsize                 offset,
589                               gsize                 count)
590 {
591   gsize available;
592   gsize end;
593   
594   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
595   g_return_val_if_fail (buffer != NULL, -1);
596
597   available = g_buffered_input_stream_get_available (stream);
598
599   if (offset > available)
600     return 0;
601   
602   end = MIN (offset + count, available);
603   count = end - offset;
604   
605   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
606   return count;
607 }
608
609 /**
610  * g_buffered_input_stream_peek_buffer:
611  * @stream: a #GBufferedInputStream.
612  * @count: a #gsize to get the number of bytes available in the buffer.
613  *
614  * Returns the buffer with the currently available bytes. The returned
615  * buffer must not be modified and will become invalid when reading from
616  * the stream or filling the buffer.
617  *
618  * Returns: read-only buffer
619  **/
620 const void*
621 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
622                                      gsize                *count)
623 {
624   GBufferedInputStreamPrivate *priv;
625
626   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
627
628   priv = stream->priv;
629
630   if (count) 
631     *count = priv->end - priv->pos;
632
633   return priv->buffer + priv->pos;
634 }
635
636 static void
637 compact_buffer (GBufferedInputStream *stream)
638 {
639   GBufferedInputStreamPrivate *priv;
640   gsize current_size;
641
642   priv = stream->priv;
643
644   current_size = priv->end - priv->pos;
645   
646   g_memmove (priv->buffer, priv->buffer + priv->pos, current_size);
647   
648   priv->pos = 0;
649   priv->end = current_size;
650 }
651
652 static gssize
653 g_buffered_input_stream_real_fill (GBufferedInputStream  *stream,
654                                    gssize                 count,
655                                    GCancellable          *cancellable,
656                                    GError               **error)
657 {
658   GBufferedInputStreamPrivate *priv;
659   GInputStream *base_stream;
660   gssize nread;
661   gsize in_buffer;
662
663   priv = stream->priv;
664
665   if (count == -1)
666     count = priv->len;
667   
668   in_buffer = priv->end - priv->pos;
669
670   /* Never fill more than can fit in the buffer */
671   count = MIN (count, priv->len - in_buffer);
672
673   /* If requested length does not fit at end, compact */
674   if (priv->len - priv->end < count)
675     compact_buffer (stream);
676
677   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
678   nread = g_input_stream_read (base_stream,
679                                priv->buffer + priv->end,
680                                count,
681                                cancellable,
682                                error);
683
684   if (nread > 0)
685     priv->end += nread;
686   
687   return nread;
688 }
689
690 static gssize
691 g_buffered_input_stream_skip (GInputStream  *stream,
692                               gsize          count,
693                               GCancellable  *cancellable,
694                               GError       **error)
695 {
696   GBufferedInputStream        *bstream;
697   GBufferedInputStreamPrivate *priv;
698   GInputStream *base_stream;
699   gsize available, bytes_skipped;
700   gssize nread;
701
702   bstream = G_BUFFERED_INPUT_STREAM (stream);
703   priv = bstream->priv;
704
705   available = priv->end - priv->pos;
706
707   if (count <= available)
708     {
709       priv->pos += count;
710       return count;
711     }
712
713   /* Full request not available, skip all currently available and 
714    * request refill for more 
715    */
716   
717   priv->pos = 0;
718   priv->end = 0;
719   bytes_skipped = available;
720   count -= available;
721
722   if (bytes_skipped > 0)
723     error = NULL; /* Ignore further errors if we already read some data */
724   
725   if (count > priv->len)
726     {
727       /* Large request, shortcut buffer */
728       
729       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
730
731       nread = g_input_stream_skip (base_stream,
732                                    count,
733                                    cancellable,
734                                    error);
735       
736       if (nread < 0 && bytes_skipped == 0)
737         return -1;
738       
739       if (nread > 0)
740         bytes_skipped += nread;
741       
742       return bytes_skipped;
743     }
744   
745   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
746   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
747   g_input_stream_set_pending (stream, TRUE); /* enable again */
748   
749   if (nread < 0)
750     {
751       if (bytes_skipped == 0)
752         return -1;
753       else
754         return bytes_skipped;
755     }
756   
757   available = priv->end - priv->pos;
758   count = MIN (count, available);
759   
760   bytes_skipped += count;
761   priv->pos += count;
762   
763   return bytes_skipped;
764 }
765
766 static gssize
767 g_buffered_input_stream_read (GInputStream *stream,
768                               void         *buffer,
769                               gsize         count,
770                               GCancellable *cancellable,
771                               GError      **error)
772 {
773   GBufferedInputStream        *bstream;
774   GBufferedInputStreamPrivate *priv;
775   GInputStream *base_stream;
776   gsize available, bytes_read;
777   gssize nread;
778
779   bstream = G_BUFFERED_INPUT_STREAM (stream);
780   priv = bstream->priv;
781
782   available = priv->end - priv->pos;
783
784   if (count <= available)
785     {
786       memcpy (buffer, priv->buffer + priv->pos, count);
787       priv->pos += count;
788       return count;
789     }
790   
791   /* Full request not available, read all currently availbile and request refill for more */
792   
793   memcpy (buffer, priv->buffer + priv->pos, available);
794   priv->pos = 0;
795   priv->end = 0;
796   bytes_read = available;
797   count -= available;
798
799   if (bytes_read > 0)
800     error = NULL; /* Ignore further errors if we already read some data */
801   
802   if (count > priv->len)
803     {
804       /* Large request, shortcut buffer */
805       
806       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
807
808       nread = g_input_stream_read (base_stream,
809                                    (char *)buffer + bytes_read,
810                                    count,
811                                    cancellable,
812                                    error);
813       
814       if (nread < 0 && bytes_read == 0)
815         return -1;
816       
817       if (nread > 0)
818         bytes_read += nread;
819       
820       return bytes_read;
821     }
822   
823   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
824   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
825   g_input_stream_set_pending (stream, TRUE); /* enable again */
826   if (nread < 0)
827     {
828       if (bytes_read == 0)
829         return -1;
830       else
831         return bytes_read;
832     }
833   
834   available = priv->end - priv->pos;
835   count = MIN (count, available);
836   
837   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
838   bytes_read += count;
839   priv->pos += count;
840   
841   return bytes_read;
842 }
843
844 /**
845  * g_buffered_input_stream_read_byte:
846  * @stream: #GBufferedInputStream.
847  * @cancellable: optional #GCancellable object, %NULL to ignore.
848  * @error: location to store the error occuring, or %NULL to ignore.
849  *
850  * Tries to read a single byte from the stream or the buffer. Will block
851  * during this read.
852  *
853  * On success, the byte read from the stream is returned. On end of stream
854  * -1 is returned but it's not an exceptional error and @error is not set.
855  * 
856  * If @cancellable is not %NULL, then the operation can be cancelled by
857  * triggering the cancellable object from another thread. If the operation
858  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
859  * operation was partially finished when the operation was cancelled the
860  * partial result will be returned, without an error.
861  *
862  * On error -1 is returned and @error is set accordingly.
863  * 
864  * Returns: the byte read from the @stream, or -1 on end of stream or error.
865  **/
866 int
867 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
868                                    GCancellable          *cancellable,
869                                    GError               **error)
870 {
871   GBufferedInputStreamPrivate *priv;
872   GInputStream *input_stream;
873   gsize available;
874   gssize nread;
875
876   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
877
878   priv = stream->priv;
879   input_stream = G_INPUT_STREAM (stream);
880
881   if (g_input_stream_is_closed (input_stream))
882     {
883       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
884        _("Stream is already closed"));
885       return -1;
886     }
887
888   if (g_input_stream_has_pending (input_stream))
889     {
890       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
891        _("Stream has outstanding operation"));
892       return -1;
893     }
894
895   available = priv->end - priv->pos;
896
897   if (available < 1)
898     return priv->buffer[priv->pos++];
899
900   /* Byte not available, request refill for more */
901
902   if (cancellable)
903     g_push_current_cancellable (cancellable);
904
905   priv->pos = 0;
906   priv->end = 0;
907
908   nread = g_buffered_input_stream_fill (stream, priv->len, cancellable, error);
909
910   if (cancellable)
911     g_pop_current_cancellable (cancellable);
912
913   if (nread <= 0)
914     return -1; /* error or end of stream */
915
916   return priv->buffer[priv->pos++];
917 }
918
919 /* ************************** */
920 /* Async stuff implementation */
921 /* ************************** */
922
923 static void
924 fill_async_callback (GObject      *source_object,
925                      GAsyncResult *result,
926                      gpointer      user_data)
927 {
928   GError *error;
929   gssize res;
930   GSimpleAsyncResult *simple;
931
932   simple = user_data;
933   
934   error = NULL;
935   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
936                                     result, &error);
937
938   g_simple_async_result_set_op_res_gssize (simple, res);
939   if (res == -1)
940     {
941       g_simple_async_result_set_from_error (simple, error);
942       g_error_free (error);
943     }
944   
945   /* Complete immediately, not in idle, since we're already in a mainloop callout */
946   g_simple_async_result_complete (simple);
947   g_object_unref (simple);
948 }
949
950 static void
951 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
952                                          gssize                count,
953                                          int                   io_priority,
954                                          GCancellable         *cancellable,
955                                          GAsyncReadyCallback   callback,
956                                          gpointer              user_data)
957 {
958   GBufferedInputStreamPrivate *priv;
959   GInputStream *base_stream;
960   GSimpleAsyncResult *simple;
961   gsize in_buffer;
962
963   priv = stream->priv;
964
965   if (count == -1)
966     count = priv->len;
967   
968   in_buffer = priv->end - priv->pos;
969
970   /* Never fill more than can fit in the buffer */
971   count = MIN (count, priv->len - in_buffer);
972
973   /* If requested length does not fit at end, compact */
974   if (priv->len - priv->end < count)
975     compact_buffer (stream);
976
977   simple = g_simple_async_result_new (G_OBJECT (stream),
978                                       callback, user_data,
979                                       g_buffered_input_stream_real_fill_async);
980   
981   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
982   g_input_stream_read_async (base_stream,
983                              priv->buffer + priv->end,
984                              count,
985                              io_priority,
986                              cancellable,
987                              fill_async_callback,
988                              simple);
989 }
990
991 static gssize
992 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
993                                           GAsyncResult         *result,
994                                           GError              **error)
995 {
996   GSimpleAsyncResult *simple;
997   gssize nread;
998
999   simple = G_SIMPLE_ASYNC_RESULT (result);
1000   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1001   
1002   nread = g_simple_async_result_get_op_res_gssize (simple);
1003   return nread;
1004 }
1005
1006 typedef struct {
1007   gssize bytes_read;
1008   gssize count;
1009   void *buffer;
1010 } ReadAsyncData;
1011
1012 static void 
1013 free_read_async_data (gpointer _data)
1014 {
1015   ReadAsyncData *data = _data;
1016   g_slice_free (ReadAsyncData, data);
1017 }
1018
1019 static void
1020 large_read_callback (GObject *source_object,
1021                      GAsyncResult *result,
1022                      gpointer user_data)
1023 {
1024   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1025   ReadAsyncData *data;
1026   GError *error;
1027   gssize nread;
1028
1029   data = g_simple_async_result_get_op_res_gpointer (simple);
1030   
1031   error = NULL;
1032   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1033                                       result, &error);
1034
1035   /* Only report the error if we've not already read some data */
1036   if (nread < 0 && data->bytes_read == 0)
1037     g_simple_async_result_set_from_error (simple, error);
1038   
1039   if (nread > 0)
1040     data->bytes_read += nread;
1041   
1042   if (error)
1043     g_error_free (error);
1044   
1045   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1046   g_simple_async_result_complete (simple);
1047   g_object_unref (simple);
1048 }
1049
1050 static void
1051 read_fill_buffer_callback (GObject *source_object,
1052                            GAsyncResult *result,
1053                            gpointer user_data)
1054 {
1055   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1056   GBufferedInputStream *bstream;
1057   GBufferedInputStreamPrivate *priv;
1058   ReadAsyncData *data;
1059   GError *error;
1060   gssize nread;
1061   gsize available;
1062
1063   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1064   priv = bstream->priv;
1065   
1066   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1067   
1068   data = g_simple_async_result_get_op_res_gpointer (simple);
1069   
1070   error = NULL;
1071   nread = g_buffered_input_stream_fill_finish (bstream,
1072                                                result, &error);
1073   
1074   if (nread < 0 && data->bytes_read == 0)
1075     g_simple_async_result_set_from_error (simple, error);
1076
1077
1078   if (nread > 0)
1079     {
1080       available = priv->end - priv->pos;
1081       data->count = MIN (data->count, available);
1082       
1083       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1084       data->bytes_read += data->count;
1085       priv->pos += data->count;
1086     }
1087
1088   if (error)
1089     g_error_free (error);
1090   
1091   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1092   g_simple_async_result_complete (simple);
1093   g_object_unref (simple);
1094 }
1095
1096 static void
1097 g_buffered_input_stream_read_async (GInputStream              *stream,
1098                                     void                      *buffer,
1099                                     gsize                      count,
1100                                     int                        io_priority,
1101                                     GCancellable              *cancellable,
1102                                     GAsyncReadyCallback        callback,
1103                                     gpointer                   user_data)
1104 {
1105   GBufferedInputStream *bstream;
1106   GBufferedInputStreamPrivate *priv;
1107   GInputStream *base_stream;
1108   gsize available;
1109   GSimpleAsyncResult *simple;
1110   ReadAsyncData *data;
1111
1112   bstream = G_BUFFERED_INPUT_STREAM (stream);
1113   priv = bstream->priv;
1114
1115   data = g_slice_new (ReadAsyncData);
1116   data->buffer = buffer;
1117   data->bytes_read = 0;
1118   simple = g_simple_async_result_new (G_OBJECT (stream),
1119                                       callback, user_data,
1120                                       g_buffered_input_stream_read_async);
1121   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1122   
1123   available = priv->end - priv->pos;
1124   
1125   if (count <= available)
1126     {
1127       memcpy (buffer, priv->buffer + priv->pos, count);
1128       priv->pos += count;
1129       data->bytes_read = count;
1130       
1131       g_simple_async_result_complete_in_idle (simple);
1132       g_object_unref (simple);
1133       return;
1134     }
1135
1136
1137   /* Full request not available, read all currently availbile and request refill for more */
1138   
1139   memcpy (buffer, priv->buffer + priv->pos, available);
1140   priv->pos = 0;
1141   priv->end = 0;
1142   
1143   count -= available;
1144   
1145   data->bytes_read = available;
1146   data->count = count;
1147
1148   if (count > priv->len)
1149     {
1150       /* Large request, shortcut buffer */
1151       
1152       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1153       
1154       g_input_stream_read_async (base_stream,
1155                                  (char *)buffer + data->bytes_read,
1156                                  count,
1157                                  io_priority, cancellable,
1158                                  large_read_callback,
1159                                  simple);
1160     }
1161   else
1162     {
1163       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1164       g_buffered_input_stream_fill_async (bstream, priv->len,
1165                                           io_priority, cancellable,
1166                                           read_fill_buffer_callback, simple);
1167     }
1168 }
1169
1170 static gssize
1171 g_buffered_input_stream_read_finish (GInputStream   *stream,
1172                                      GAsyncResult   *result,
1173                                      GError        **error)
1174 {
1175   GSimpleAsyncResult *simple;
1176   ReadAsyncData *data;
1177   
1178   simple = G_SIMPLE_ASYNC_RESULT (result);
1179   
1180   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1181
1182   data = g_simple_async_result_get_op_res_gpointer (simple);
1183   
1184   return data->bytes_read;
1185 }
1186
1187 typedef struct {
1188   gssize bytes_skipped;
1189   gssize count;
1190 } SkipAsyncData;
1191
1192 static void 
1193 free_skip_async_data (gpointer _data)
1194 {
1195   SkipAsyncData *data = _data;
1196   g_slice_free (SkipAsyncData, data);
1197 }
1198
1199 static void
1200 large_skip_callback (GObject *source_object,
1201                      GAsyncResult *result,
1202                      gpointer user_data)
1203 {
1204   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1205   SkipAsyncData *data;
1206   GError *error;
1207   gssize nread;
1208
1209   data = g_simple_async_result_get_op_res_gpointer (simple);
1210   
1211   error = NULL;
1212   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1213                                       result, &error);
1214
1215   /* Only report the error if we've not already read some data */
1216   if (nread < 0 && data->bytes_skipped == 0)
1217     g_simple_async_result_set_from_error (simple, error);
1218   
1219   if (nread > 0)
1220     data->bytes_skipped += nread;
1221   
1222   if (error)
1223     g_error_free (error);
1224   
1225   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1226   g_simple_async_result_complete (simple);
1227   g_object_unref (simple);
1228 }
1229
1230 static void
1231 skip_fill_buffer_callback (GObject *source_object,
1232                            GAsyncResult *result,
1233                            gpointer user_data)
1234 {
1235   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1236   GBufferedInputStream *bstream;
1237   GBufferedInputStreamPrivate *priv;
1238   SkipAsyncData *data;
1239   GError *error;
1240   gssize nread;
1241   gsize available;
1242
1243   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1244   priv = bstream->priv;
1245   
1246   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1247   
1248   data = g_simple_async_result_get_op_res_gpointer (simple);
1249   
1250   error = NULL;
1251   nread = g_buffered_input_stream_fill_finish (bstream,
1252                                                result, &error);
1253   
1254   if (nread < 0 && data->bytes_skipped == 0)
1255     g_simple_async_result_set_from_error (simple, error);
1256
1257
1258   if (nread > 0)
1259     {
1260       available = priv->end - priv->pos;
1261       data->count = MIN (data->count, available);
1262       
1263       data->bytes_skipped += data->count;
1264       priv->pos += data->count;
1265     }
1266
1267   if (error)
1268     g_error_free (error);
1269   
1270   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1271   g_simple_async_result_complete (simple);
1272   g_object_unref (simple);
1273 }
1274
1275 static void
1276 g_buffered_input_stream_skip_async (GInputStream              *stream,
1277                                     gsize                      count,
1278                                     int                        io_priority,
1279                                     GCancellable              *cancellable,
1280                                     GAsyncReadyCallback        callback,
1281                                     gpointer                   user_data)
1282 {
1283   GBufferedInputStream *bstream;
1284   GBufferedInputStreamPrivate *priv;
1285   GInputStream *base_stream;
1286   gsize available;
1287   GSimpleAsyncResult *simple;
1288   SkipAsyncData *data;
1289
1290   bstream = G_BUFFERED_INPUT_STREAM (stream);
1291   priv = bstream->priv;
1292
1293   data = g_slice_new (SkipAsyncData);
1294   data->bytes_skipped = 0;
1295   simple = g_simple_async_result_new (G_OBJECT (stream),
1296                                       callback, user_data,
1297                                       g_buffered_input_stream_skip_async);
1298   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1299   
1300   available = priv->end - priv->pos;
1301   
1302   if (count <= available)
1303     {
1304       priv->pos += count;
1305       data->bytes_skipped = count;
1306       
1307       g_simple_async_result_complete_in_idle (simple);
1308       g_object_unref (simple);
1309       return;
1310     }
1311
1312
1313   /* Full request not available, skip all currently availbile and request refill for more */
1314   
1315   priv->pos = 0;
1316   priv->end = 0;
1317   
1318   count -= available;
1319   
1320   data->bytes_skipped = available;
1321   data->count = count;
1322
1323   if (count > priv->len)
1324     {
1325       /* Large request, shortcut buffer */
1326       
1327       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1328       
1329       g_input_stream_skip_async (base_stream,
1330                                  count,
1331                                  io_priority, cancellable,
1332                                  large_skip_callback,
1333                                  simple);
1334     }
1335   else
1336     {
1337       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1338       g_buffered_input_stream_fill_async (bstream, priv->len,
1339                                           io_priority, cancellable,
1340                                           skip_fill_buffer_callback, simple);
1341     }
1342 }
1343
1344 static gssize
1345 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1346                                      GAsyncResult   *result,
1347                                      GError        **error)
1348 {
1349   GSimpleAsyncResult *simple;
1350   SkipAsyncData *data;
1351   
1352   simple = G_SIMPLE_ASYNC_RESULT (result);
1353   
1354   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1355
1356   data = g_simple_async_result_get_op_res_gpointer (simple);
1357   
1358   return data->bytes_skipped;
1359 }
1360
1361
1362 #define __G_BUFFERED_INPUT_STREAM_C__
1363 #include "gioaliasdef.c"