Another round of trivial doc fixes
[platform/upstream/glib.git] / gio / gbufferedinputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  * Copyright (C) 2007 Jürg Billeter
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General
17  * Public License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
19  * Boston, MA 02111-1307, USA.
20  *
21  * Author: Christian Kellner <gicmo@gnome.org> 
22  */
23
24 #include <config.h>
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gsimpleasyncresult.h"
28 #include <string.h>
29 #include "glibintl.h"
30
31 /**
32  * SECTION:gbufferedinputstream
33  * @short_description: Buffered Input Stream
34  * @see_also: #GFilterInputStream, #GInputStream
35  * 
36  * Buffered input stream implements #GFilterInputStream and provides 
37  * for buffered reads. 
38  * 
39  * By default, #GBufferedInputStream's buffer size is set at 4 kilobytes.
40  * 
41  * To create a buffered input stream, use g_buffered_input_stream_new(), or 
42  * g_buffered_input_stream_new_sized() to specify the buffer's size at construction.
43  * 
44  * To get the size of a buffer within a buffered input stream, use 
45  * g_buffered_input_stream_get_buffer_size(). To change the size of a 
46  * buffered input stream's buffer, use g_buffered_input_stream_set_buffer_size(). 
47  * Note: the buffer's size cannot be reduced below the size of the data within the
48  * buffer.
49  *
50  **/
51
52
53
54 #define DEFAULT_BUFFER_SIZE 4096
55
56 struct _GBufferedInputStreamPrivate {
57   guint8 *buffer;
58   gsize   len;
59   gsize   pos;
60   gsize   end;
61   GAsyncReadyCallback outstanding_callback;
62 };
63
64 enum {
65   PROP_0,
66   PROP_BUFSIZE
67 };
68
69 static void g_buffered_input_stream_set_property  (GObject      *object,
70                                                    guint         prop_id,
71                                                    const GValue *value,
72                                                    GParamSpec   *pspec);
73
74 static void g_buffered_input_stream_get_property  (GObject      *object,
75                                                    guint         prop_id,
76                                                    GValue       *value,
77                                                    GParamSpec   *pspec);
78 static void g_buffered_input_stream_finalize      (GObject *object);
79
80
81 static gssize g_buffered_input_stream_skip             (GInputStream          *stream,
82                                                         gsize                  count,
83                                                         GCancellable          *cancellable,
84                                                         GError               **error);
85 static void   g_buffered_input_stream_skip_async       (GInputStream          *stream,
86                                                         gsize                  count,
87                                                         int                    io_priority,
88                                                         GCancellable          *cancellable,
89                                                         GAsyncReadyCallback    callback,
90                                                         gpointer               user_data);
91 static gssize g_buffered_input_stream_skip_finish      (GInputStream          *stream,
92                                                         GAsyncResult          *result,
93                                                         GError               **error);
94 static gssize g_buffered_input_stream_read             (GInputStream          *stream,
95                                                         void                  *buffer,
96                                                         gsize                  count,
97                                                         GCancellable          *cancellable,
98                                                         GError               **error);
99 static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
100                                                         void                  *buffer,
101                                                         gsize                  count,
102                                                         int                    io_priority,
103                                                         GCancellable          *cancellable,
104                                                         GAsyncReadyCallback    callback,
105                                                         gpointer               user_data);
106 static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
107                                                         GAsyncResult          *result,
108                                                         GError               **error);
109 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
110                                                         gssize                 count,
111                                                         GCancellable          *cancellable,
112                                                         GError               **error);
113 static void   g_buffered_input_stream_real_fill_async  (GBufferedInputStream  *stream,
114                                                         gssize                 count,
115                                                         int                    io_priority,
116                                                         GCancellable          *cancellable,
117                                                         GAsyncReadyCallback    callback,
118                                                         gpointer               user_data);
119 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream  *stream,
120                                                         GAsyncResult          *result,
121                                                         GError               **error);
122
123 static void compact_buffer (GBufferedInputStream *stream);
124
125 G_DEFINE_TYPE (GBufferedInputStream,
126                g_buffered_input_stream,
127                G_TYPE_FILTER_INPUT_STREAM)
128
129
130 static void
131 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
132 {
133   GObjectClass *object_class;
134   GInputStreamClass *istream_class;
135   GBufferedInputStreamClass *bstream_class;
136
137   g_type_class_add_private (klass, sizeof (GBufferedInputStreamPrivate));
138
139   object_class = G_OBJECT_CLASS (klass);
140   object_class->get_property = g_buffered_input_stream_get_property;
141   object_class->set_property = g_buffered_input_stream_set_property;
142   object_class->finalize     = g_buffered_input_stream_finalize;
143
144   istream_class = G_INPUT_STREAM_CLASS (klass);
145   istream_class->skip = g_buffered_input_stream_skip;
146   istream_class->skip_async  = g_buffered_input_stream_skip_async;
147   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
148   istream_class->read = g_buffered_input_stream_read;
149   istream_class->read_async  = g_buffered_input_stream_read_async;
150   istream_class->read_finish = g_buffered_input_stream_read_finish;
151
152   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
153   bstream_class->fill = g_buffered_input_stream_real_fill;
154   bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
155   bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
156   
157   g_object_class_install_property (object_class,
158                                    PROP_BUFSIZE,
159                                    g_param_spec_uint ("buffer-size",
160                                                       P_("Buffer Size"),
161                                                       P_("The size of the backend buffer"),
162                                                       1,
163                                                       G_MAXUINT,
164                                                       DEFAULT_BUFFER_SIZE,
165                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
166                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
167
168
169 }
170
171 /**
172  * g_buffered_input_stream_get_buffer_size:
173  * @stream: #GBufferedInputStream.
174  * 
175  * Gets the size of the input buffer.
176  * 
177  * Returns: the current buffer size, or %-1 on error.
178  **/
179 gsize
180 g_buffered_input_stream_get_buffer_size (GBufferedInputStream  *stream)
181 {
182   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
183
184   return stream->priv->len;
185 }
186
187 /**
188  * g_buffered_input_stream_set_buffer_size:
189  * @stream: #GBufferedInputStream.
190  * @size: a #gsize.
191  *
192  * Sets the size of the internal buffer of @stream to @size, or to the 
193  * size of the contents of the buffer. The buffer can never be resized 
194  * smaller than its current contents.
195  **/
196 void
197 g_buffered_input_stream_set_buffer_size (GBufferedInputStream  *stream,
198                                          gsize                  size)
199 {
200   GBufferedInputStreamPrivate *priv;
201   gsize in_buffer;
202   guint8 *buffer;
203   
204   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
205
206   priv = stream->priv;
207
208   if (priv->buffer)
209     {
210       in_buffer = priv->end - priv->pos;
211
212       /* Never resize smaller than current buffer contents */
213       size = MAX (size, in_buffer);
214
215       buffer = g_malloc (size);
216       memcpy (buffer, priv->buffer + priv->pos, in_buffer);
217       priv->len = size;
218       priv->pos = 0;
219       priv->end = in_buffer;
220       g_free (priv->buffer);
221       priv->buffer = buffer;
222     }
223   else
224     {
225       priv->len = size;
226       priv->pos = 0;
227       priv->end = 0;
228       priv->buffer = g_malloc (size);
229     }
230 }
231
232 static void
233 g_buffered_input_stream_set_property (GObject         *object,
234                                       guint            prop_id,
235                                       const GValue    *value,
236                                       GParamSpec      *pspec)
237 {
238   GBufferedInputStreamPrivate *priv;
239   GBufferedInputStream        *bstream;
240
241   bstream = G_BUFFERED_INPUT_STREAM (object);
242   priv = bstream->priv;
243
244   switch (prop_id) 
245     {
246     case PROP_BUFSIZE:
247       g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
248       break;
249
250     default:
251       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
252       break;
253     }
254
255 }
256
257 static void
258 g_buffered_input_stream_get_property (GObject    *object,
259                                       guint       prop_id,
260                                       GValue     *value,
261                                       GParamSpec *pspec)
262 {
263   GBufferedInputStreamPrivate *priv;
264   GBufferedInputStream        *bstream;
265
266   bstream = G_BUFFERED_INPUT_STREAM (object);
267   priv = bstream->priv;
268
269   switch (prop_id)
270     { 
271     
272     case PROP_BUFSIZE:
273       g_value_set_uint (value, priv->len);
274       break;
275     default:
276       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
277       break;
278     }
279
280 }
281
282 static void
283 g_buffered_input_stream_finalize (GObject *object)
284 {
285   GBufferedInputStreamPrivate *priv;
286   GBufferedInputStream        *stream;
287
288   stream = G_BUFFERED_INPUT_STREAM (object);
289   priv = stream->priv;
290
291   g_free (priv->buffer);
292
293   if (G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize)
294     (*G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize) (object);
295 }
296
297 static void
298 g_buffered_input_stream_init (GBufferedInputStream *stream)
299 {
300   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
301                                               G_TYPE_BUFFERED_INPUT_STREAM,
302                                               GBufferedInputStreamPrivate);
303 }
304
305
306 /**
307  * g_buffered_input_stream_new:
308  * @base_stream: a #GInputStream.
309  * 
310  * Creates a new #GInputStream from the given @base_stream, with 
311  * a buffer set to the default size (4 kilobytes).
312  *
313  * Returns: a #GInputStream for the given @base_stream.
314  **/
315 GInputStream *
316 g_buffered_input_stream_new (GInputStream *base_stream)
317 {
318   GInputStream *stream;
319
320   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
321
322   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
323                          "base-stream", base_stream,
324                          NULL);
325
326   return stream;
327 }
328
329 /**
330  * g_buffered_input_stream_new_sized:
331  * @base_stream: a #GOutputStream.
332  * @size: a #gsize.
333  * 
334  * Creates a new #GBufferedInputStream from the given @base_stream, with 
335  * a buffer set to @size.
336  *
337  * Returns: a #GInputStream.
338  **/
339 GInputStream *
340 g_buffered_input_stream_new_sized (GInputStream *base_stream,
341                                    gsize         size)
342 {
343   GInputStream *stream;
344
345   g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
346
347   stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
348                          "base-stream", base_stream,
349                          "buffer-size", (guint)size,
350                          NULL);
351
352   return stream;
353 }
354
355 /**
356  * g_buffered_input_stream_fill:
357  * @stream: #GBufferedInputStream.
358  * @count: the number of bytes that will be read from the stream.
359  * @cancellable: optional #GCancellable object, %NULL to ignore.
360  * @error: location to store the error occuring, or %NULL to ignore.
361  *
362  * Tries to read @count bytes from the stream into the buffer. 
363  * Will block during this read.
364  * 
365  * If @count is zero, returns zero and does nothing. A value of @count
366  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
367  *
368  * On success, the number of bytes read into the buffer is returned.
369  * It is not an error if this is not the same as the requested size, as it
370  * can happen e.g. near the end of a file. Zero is returned on end of file
371  * (or if @count is zero),  but never otherwise.
372  *
373  * If @cancellable is not %NULL, then the operation can be cancelled by
374  * triggering the cancellable object from another thread. If the operation
375  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
376  * operation was partially finished when the operation was cancelled the
377  * partial result will be returned, without an error.
378  *
379  * On error -1 is returned and @error is set accordingly.
380  * 
381  * For the asynchronous, non-blocking, version of this function, see 
382  * g_buffered_input_stream_fill_async().
383  *
384  * Returns: the number of bytes read into @stream's buffer, up to @count, 
385  * or -1 on error.
386  **/
387 gssize
388 g_buffered_input_stream_fill (GBufferedInputStream  *stream,
389                               gssize                 count,
390                               GCancellable          *cancellable,
391                               GError               **error)
392 {
393   GBufferedInputStreamClass *class;
394   GInputStream *input_stream;
395   gssize res;
396
397   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
398   
399   input_stream = G_INPUT_STREAM (stream);
400   
401   if (g_input_stream_is_closed (input_stream))
402     {
403       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
404                    _("Stream is already closed"));
405       return -1;
406     }
407   
408   if (g_input_stream_has_pending (input_stream))
409     {
410       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
411                    _("Stream has outstanding operation"));
412       return -1;
413     }
414       
415   g_input_stream_set_pending (input_stream, TRUE);
416
417   if (cancellable)
418     g_push_current_cancellable (cancellable);
419   
420   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
421   res = class->fill (stream, count, cancellable, error);
422
423   if (cancellable)
424     g_pop_current_cancellable (cancellable);
425   
426   g_input_stream_set_pending (input_stream, FALSE);
427   
428   return res;
429 }
430
431 static void
432 async_fill_callback_wrapper (GObject *source_object,
433                              GAsyncResult *res,
434                              gpointer      user_data)
435 {
436   GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
437
438   g_input_stream_set_pending (G_INPUT_STREAM (stream), FALSE);
439   (*stream->priv->outstanding_callback) (source_object, res, user_data);
440   g_object_unref (stream);
441 }
442
443 /**
444  * g_buffered_input_stream_fill_async:
445  * @stream: #GBufferedInputStream.
446  * @count: a #gssize.
447  * @io_priority: the io priority of the request. the io priority of the request.
448  * @cancellable: optional #GCancellable object
449  * @callback: a #GAsyncReadyCallback.
450  * @user_data: a #gpointer.
451  *
452  * Reads data into @stream's buffer asynchronously, up to @count size.
453  * @io_priority can be used to prioritize reads. For the synchronous
454  * version of this function, see g_buffered_input_stream_fill().
455  * 
456  **/
457
458 void
459 g_buffered_input_stream_fill_async (GBufferedInputStream  *stream,
460                                     gssize                 count,
461                                     int                    io_priority,
462                                     GCancellable          *cancellable,
463                                     GAsyncReadyCallback    callback,
464                                     gpointer               user_data)
465 {
466   GBufferedInputStreamClass *class;
467   GSimpleAsyncResult *simple;
468
469   g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
470
471   if (count == 0)
472     {
473       simple = g_simple_async_result_new (G_OBJECT (stream),
474                                           callback,
475                                           user_data,
476                                           g_buffered_input_stream_fill_async);
477       g_simple_async_result_complete_in_idle (simple);
478       g_object_unref (simple);
479       return;
480     }
481   
482   if (((gssize) count) < 0)
483     {
484       g_simple_async_report_error_in_idle (G_OBJECT (stream),
485                                            callback,
486                                            user_data,
487                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
488                                            _("Too large count value passed to g_input_stream_read_async"));
489       return;
490     }
491   
492   if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
493     {
494       g_simple_async_report_error_in_idle (G_OBJECT (stream),
495                                            callback,
496                                            user_data,
497                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
498                                            _("Stream is already closed"));
499       return;
500     }
501     
502   if (g_input_stream_has_pending (G_INPUT_STREAM (stream)))
503     {
504       g_simple_async_report_error_in_idle (G_OBJECT (stream),
505                                            callback,
506                                            user_data,
507                                            G_IO_ERROR, G_IO_ERROR_PENDING,
508                                            _("Stream has outstanding operation"));
509       return;
510     }
511
512   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
513   
514   g_input_stream_set_pending (G_INPUT_STREAM (stream), TRUE);
515   stream->priv->outstanding_callback = callback;
516   g_object_ref (stream);
517   class->fill_async (stream, count, io_priority, cancellable,
518                      async_fill_callback_wrapper, user_data);
519 }
520
521 /**
522  * g_buffered_input_stream_fill_finish:
523  * @stream: a #GBufferedInputStream.
524  * @result: a #GAsyncResult.
525  * @error: a #GError.
526  *
527  * Finishes an asynchronous read.
528  * 
529  * Returns: a #gssize of the read stream, or %-1 on an error. 
530  **/
531 gssize
532 g_buffered_input_stream_fill_finish (GBufferedInputStream  *stream,
533                                      GAsyncResult          *result,
534                                      GError               **error)
535 {
536   GSimpleAsyncResult *simple;
537   GBufferedInputStreamClass *class;
538
539   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
540   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
541
542   if (G_IS_SIMPLE_ASYNC_RESULT (result))
543     {
544       simple = G_SIMPLE_ASYNC_RESULT (result);
545       if (g_simple_async_result_propagate_error (simple, error))
546         return -1;
547
548       /* Special case read of 0 bytes */
549       if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
550         return 0;
551     }
552
553   class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
554   return class->fill_finish (stream, result, error);
555 }
556
557 /**
558  * g_buffered_input_stream_get_available:
559  * @stream: #GBufferedInputStream.
560  * 
561  * Gets the size of the available data within the stream.
562  * 
563  * Returns: size of the available stream. 
564  **/
565 gsize
566 g_buffered_input_stream_get_available (GBufferedInputStream  *stream)
567 {
568   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
569
570   return stream->priv->end - stream->priv->pos;
571 }
572
573 /**
574  * g_buffered_input_stream_peek:
575  * @stream: a #GBufferedInputStream.
576  * @buffer: a pointer to an allocated chunk of memory.
577  * @offset: a #gsize.
578  * @count: a #gsize.
579  * 
580  * Peeks in the buffer, copying data of size @count into @buffer, offset
581  * @offset bytes.
582  * 
583  * Returns: a #gsize of the number of bytes peeked, or %-1 on error.
584  **/
585 gsize
586 g_buffered_input_stream_peek (GBufferedInputStream  *stream,
587                               void                  *buffer,
588                               gsize                  offset,
589                               gsize                  count)
590 {
591   gsize available;
592   gsize end;
593   
594   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
595   g_return_val_if_fail (buffer != NULL, -1);
596
597   available = g_buffered_input_stream_get_available (stream);
598
599   if (offset > available)
600     return 0;
601   
602   end = MIN (offset + count, available);
603   count = end - offset;
604   
605   memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
606   return count;
607 }
608
609 /**
610  * g_buffered_input_stream_peek_buffer:
611  * @stream: a #GBufferedInputStream.
612  * @count: a #gsize to get the number of bytes available in the buffer.
613  *
614  * Returns the buffer with the currently available bytes. The returned
615  * buffer must not be modified and will become invalid when reading from
616  * the stream or filling the buffer.
617  *
618  * Returns: read-only buffer
619  **/
620 const void*
621 g_buffered_input_stream_peek_buffer (GBufferedInputStream  *stream,
622                                      gsize                 *count)
623 {
624   GBufferedInputStreamPrivate *priv;
625
626   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
627
628   priv = stream->priv;
629
630   if (count) {
631     *count = priv->end - priv->pos;
632   }
633
634   return priv->buffer + priv->pos;
635 }
636
637 static void
638 compact_buffer (GBufferedInputStream *stream)
639 {
640   GBufferedInputStreamPrivate *priv;
641   gsize current_size;
642
643   priv = stream->priv;
644
645   current_size = priv->end - priv->pos;
646   
647   g_memmove (priv->buffer,
648              priv->buffer + priv->pos,
649              current_size);
650   
651   priv->pos = 0;
652   priv->end = current_size;
653 }
654
655 static gssize
656 g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
657                                    gssize                 count,
658                                    GCancellable         *cancellable,
659                                    GError              **error)
660 {
661   GBufferedInputStreamPrivate *priv;
662   GInputStream *base_stream;
663   gssize nread;
664   gsize in_buffer;
665
666   priv = stream->priv;
667
668   if (count == -1)
669     count = priv->len;
670   
671   in_buffer = priv->end - priv->pos;
672
673   /* Never fill more than can fit in the buffer */
674   count = MIN (count, priv->len - in_buffer);
675
676   /* If requested length does not fit at end, compact */
677   if (priv->len - priv->end < count)
678     compact_buffer (stream);
679
680   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
681   nread = g_input_stream_read (base_stream,
682                                priv->buffer + priv->end,
683                                count,
684                                cancellable,
685                                error);
686
687   if (nread > 0)
688     priv->end += nread;
689   
690   return nread;
691 }
692
693 static gssize
694 g_buffered_input_stream_skip (GInputStream          *stream,
695                               gsize                  count,
696                               GCancellable          *cancellable,
697                               GError               **error)
698 {
699   GBufferedInputStream        *bstream;
700   GBufferedInputStreamPrivate *priv;
701   GInputStream *base_stream;
702   gsize available, bytes_skipped;
703   gssize nread;
704
705   bstream = G_BUFFERED_INPUT_STREAM (stream);
706   priv = bstream->priv;
707
708   available = priv->end - priv->pos;
709
710   if (count <= available)
711     {
712       priv->pos += count;
713       return count;
714     }
715
716   /* Full request not available, skip all currently availbile and request refill for more */
717   
718   priv->pos = 0;
719   priv->end = 0;
720   bytes_skipped = available;
721   count -= available;
722
723   if (bytes_skipped > 0)
724     error = NULL; /* Ignore further errors if we already read some data */
725   
726   if (count > priv->len)
727     {
728       /* Large request, shortcut buffer */
729       
730       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
731
732       nread = g_input_stream_skip (base_stream,
733                                    count,
734                                    cancellable,
735                                    error);
736       
737       if (nread < 0 && bytes_skipped == 0)
738         return -1;
739       
740       if (nread > 0)
741         bytes_skipped += nread;
742       
743       return bytes_skipped;
744     }
745   
746   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
747   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
748   g_input_stream_set_pending (stream, TRUE); /* enable again */
749   
750   if (nread < 0)
751     {
752       if (bytes_skipped == 0)
753         return -1;
754       else
755         return bytes_skipped;
756     }
757   
758   available = priv->end - priv->pos;
759   count = MIN (count, available);
760   
761   bytes_skipped += count;
762   priv->pos += count;
763   
764   return bytes_skipped;
765 }
766
767 static gssize
768 g_buffered_input_stream_read (GInputStream *stream,
769                               void         *buffer,
770                               gsize         count,
771                               GCancellable *cancellable,
772                               GError      **error)
773 {
774   GBufferedInputStream        *bstream;
775   GBufferedInputStreamPrivate *priv;
776   GInputStream *base_stream;
777   gsize available, bytes_read;
778   gssize nread;
779
780   bstream = G_BUFFERED_INPUT_STREAM (stream);
781   priv = bstream->priv;
782
783   available = priv->end - priv->pos;
784
785   if (count <= available)
786     {
787       memcpy (buffer, priv->buffer + priv->pos, count);
788       priv->pos += count;
789       return count;
790     }
791   
792   /* Full request not available, read all currently availbile and request refill for more */
793   
794   memcpy (buffer, priv->buffer + priv->pos, available);
795   priv->pos = 0;
796   priv->end = 0;
797   bytes_read = available;
798   count -= available;
799
800   if (bytes_read > 0)
801     error = NULL; /* Ignore further errors if we already read some data */
802   
803   if (count > priv->len)
804     {
805       /* Large request, shortcut buffer */
806       
807       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
808
809       nread = g_input_stream_read (base_stream,
810                                    (char *)buffer + bytes_read,
811                                    count,
812                                    cancellable,
813                                    error);
814       
815       if (nread < 0 && bytes_read == 0)
816         return -1;
817       
818       if (nread > 0)
819         bytes_read += nread;
820       
821       return bytes_read;
822     }
823   
824   g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
825   nread = g_buffered_input_stream_fill (bstream, priv->len, cancellable, error);
826   g_input_stream_set_pending (stream, TRUE); /* enable again */
827   if (nread < 0)
828     {
829       if (bytes_read == 0)
830         return -1;
831       else
832         return bytes_read;
833     }
834   
835   available = priv->end - priv->pos;
836   count = MIN (count, available);
837   
838   memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
839   bytes_read += count;
840   priv->pos += count;
841   
842   return bytes_read;
843 }
844
845 /**
846  * g_buffered_input_stream_read_byte:
847  * @stream: #GBufferedInputStream.
848  * @cancellable: optional #GCancellable object, %NULL to ignore.
849  * @error: location to store the error occuring, or %NULL to ignore.
850  *
851  * Tries to read a single byte from the stream or the buffer. Will block
852  * during this read.
853  *
854  * On success, the byte read from the stream is returned. On end of stream
855  * -1 is returned but it's not an exceptional error and @error is not set.
856  * 
857  * If @cancellable is not %NULL, then the operation can be cancelled by
858  * triggering the cancellable object from another thread. If the operation
859  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
860  * operation was partially finished when the operation was cancelled the
861  * partial result will be returned, without an error.
862  *
863  * On error -1 is returned and @error is set accordingly.
864  * 
865  * Returns: the byte read from the @stream, or -1 on end of stream or error.
866  **/
867 int
868 g_buffered_input_stream_read_byte (GBufferedInputStream  *stream,
869                                    GCancellable          *cancellable,
870                                    GError               **error)
871 {
872   GBufferedInputStreamPrivate *priv;
873   GInputStream *input_stream;
874   gsize available;
875   gssize nread;
876
877   g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
878
879   priv = stream->priv;
880   input_stream = G_INPUT_STREAM (stream);
881
882   if (g_input_stream_is_closed (input_stream))
883     {
884       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
885        _("Stream is already closed"));
886       return -1;
887     }
888
889   if (g_input_stream_has_pending (input_stream))
890     {
891       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
892        _("Stream has outstanding operation"));
893       return -1;
894     }
895
896   available = priv->end - priv->pos;
897
898   if (available < 1)
899     return priv->buffer[priv->pos++];
900
901   /* Byte not available, request refill for more */
902
903   if (cancellable)
904     g_push_current_cancellable (cancellable);
905
906   priv->pos = 0;
907   priv->end = 0;
908
909   nread = g_buffered_input_stream_fill (stream, priv->len, cancellable, error);
910
911   if (cancellable)
912     g_pop_current_cancellable (cancellable);
913
914   if (nread <= 0)
915     return -1; /* error or end of stream */
916
917   return priv->buffer[priv->pos++];
918 }
919
920 /* ************************** */
921 /* Async stuff implementation */
922 /* ************************** */
923
924 static void
925 fill_async_callback (GObject *source_object,
926                      GAsyncResult *result,
927                      gpointer user_data)
928 {
929   GError *error;
930   gssize res;
931   GSimpleAsyncResult *simple;
932
933   simple = user_data;
934   
935   error = NULL;
936   res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
937                                     result, &error);
938
939   g_simple_async_result_set_op_res_gssize (simple, res);
940   if (res == -1)
941     {
942       g_simple_async_result_set_from_error (simple, error);
943       g_error_free (error);
944     }
945   
946   /* Complete immediately, not in idle, since we're already in a mainloop callout */
947   g_simple_async_result_complete (simple);
948   g_object_unref (simple);
949 }
950
951 static void
952 g_buffered_input_stream_real_fill_async  (GBufferedInputStream *stream,
953                                           gssize                count,
954                                           int                   io_priority,
955                                           GCancellable         *cancellable,
956                                           GAsyncReadyCallback   callback,
957                                           gpointer              user_data)
958 {
959   GBufferedInputStreamPrivate *priv;
960   GInputStream *base_stream;
961   GSimpleAsyncResult *simple;
962   gsize in_buffer;
963
964   priv = stream->priv;
965
966   if (count == -1)
967     count = priv->len;
968   
969   in_buffer = priv->end - priv->pos;
970
971   /* Never fill more than can fit in the buffer */
972   count = MIN (count, priv->len - in_buffer);
973
974   /* If requested length does not fit at end, compact */
975   if (priv->len - priv->end < count)
976     compact_buffer (stream);
977
978   simple = g_simple_async_result_new (G_OBJECT (stream),
979                                       callback, user_data,
980                                       g_buffered_input_stream_real_fill_async);
981   
982   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
983   g_input_stream_read_async (base_stream,
984                              priv->buffer + priv->end,
985                              count,
986                              io_priority,
987                              cancellable,
988                              fill_async_callback,
989                              simple);
990 }
991
992 static gssize
993 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
994                                           GAsyncResult         *result,
995                                           GError              **error)
996 {
997   GSimpleAsyncResult *simple;
998   gssize nread;
999
1000   simple = G_SIMPLE_ASYNC_RESULT (result);
1001   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1002   
1003   nread = g_simple_async_result_get_op_res_gssize (simple);
1004   return nread;
1005 }
1006
1007 typedef struct {
1008   gssize bytes_read;
1009   gssize count;
1010   void *buffer;
1011 } ReadAsyncData;
1012
1013 static void 
1014 free_read_async_data (gpointer _data)
1015 {
1016   ReadAsyncData *data = _data;
1017   g_slice_free (ReadAsyncData, data);
1018 }
1019
1020 static void
1021 large_read_callback (GObject *source_object,
1022                      GAsyncResult *result,
1023                      gpointer user_data)
1024 {
1025   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1026   ReadAsyncData *data;
1027   GError *error;
1028   gssize nread;
1029
1030   data = g_simple_async_result_get_op_res_gpointer (simple);
1031   
1032   error = NULL;
1033   nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1034                                       result, &error);
1035
1036   /* Only report the error if we've not already read some data */
1037   if (nread < 0 && data->bytes_read == 0)
1038     g_simple_async_result_set_from_error (simple, error);
1039   
1040   if (nread > 0)
1041     data->bytes_read += nread;
1042   
1043   if (error)
1044     g_error_free (error);
1045   
1046   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1047   g_simple_async_result_complete (simple);
1048   g_object_unref (simple);
1049 }
1050
1051 static void
1052 read_fill_buffer_callback (GObject *source_object,
1053                            GAsyncResult *result,
1054                            gpointer user_data)
1055 {
1056   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1057   GBufferedInputStream *bstream;
1058   GBufferedInputStreamPrivate *priv;
1059   ReadAsyncData *data;
1060   GError *error;
1061   gssize nread;
1062   gsize available;
1063
1064   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1065   priv = bstream->priv;
1066   
1067   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1068   
1069   data = g_simple_async_result_get_op_res_gpointer (simple);
1070   
1071   error = NULL;
1072   nread = g_buffered_input_stream_fill_finish (bstream,
1073                                                result, &error);
1074   
1075   if (nread < 0 && data->bytes_read == 0)
1076     g_simple_async_result_set_from_error (simple, error);
1077
1078
1079   if (nread > 0)
1080     {
1081       available = priv->end - priv->pos;
1082       data->count = MIN (data->count, available);
1083       
1084       memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1085       data->bytes_read += data->count;
1086       priv->pos += data->count;
1087     }
1088
1089   if (error)
1090     g_error_free (error);
1091   
1092   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1093   g_simple_async_result_complete (simple);
1094   g_object_unref (simple);
1095 }
1096
1097 static void
1098 g_buffered_input_stream_read_async (GInputStream              *stream,
1099                                     void                      *buffer,
1100                                     gsize                      count,
1101                                     int                        io_priority,
1102                                     GCancellable              *cancellable,
1103                                     GAsyncReadyCallback        callback,
1104                                     gpointer                   user_data)
1105 {
1106   GBufferedInputStream *bstream;
1107   GBufferedInputStreamPrivate *priv;
1108   GInputStream *base_stream;
1109   gsize available;
1110   GSimpleAsyncResult *simple;
1111   ReadAsyncData *data;
1112
1113   bstream = G_BUFFERED_INPUT_STREAM (stream);
1114   priv = bstream->priv;
1115
1116   data = g_slice_new (ReadAsyncData);
1117   data->buffer = buffer;
1118   data->bytes_read = 0;
1119   simple = g_simple_async_result_new (G_OBJECT (stream),
1120                                       callback, user_data,
1121                                       g_buffered_input_stream_read_async);
1122   g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1123   
1124   available = priv->end - priv->pos;
1125   
1126   if (count <= available)
1127     {
1128       memcpy (buffer, priv->buffer + priv->pos, count);
1129       priv->pos += count;
1130       data->bytes_read = count;
1131       
1132       g_simple_async_result_complete_in_idle (simple);
1133       g_object_unref (simple);
1134       return;
1135     }
1136
1137
1138   /* Full request not available, read all currently availbile and request refill for more */
1139   
1140   memcpy (buffer, priv->buffer + priv->pos, available);
1141   priv->pos = 0;
1142   priv->end = 0;
1143   
1144   count -= available;
1145   
1146   data->bytes_read = available;
1147   data->count = count;
1148
1149   if (count > priv->len)
1150     {
1151       /* Large request, shortcut buffer */
1152       
1153       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1154       
1155       g_input_stream_read_async (base_stream,
1156                                  (char *)buffer + data->bytes_read,
1157                                  count,
1158                                  io_priority, cancellable,
1159                                  large_read_callback,
1160                                  simple);
1161     }
1162   else
1163     {
1164       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1165       g_buffered_input_stream_fill_async (bstream, priv->len,
1166                                           io_priority, cancellable,
1167                                           read_fill_buffer_callback, simple);
1168     }
1169 }
1170
1171 static gssize
1172 g_buffered_input_stream_read_finish (GInputStream   *stream,
1173                                      GAsyncResult   *result,
1174                                      GError        **error)
1175 {
1176   GSimpleAsyncResult *simple;
1177   ReadAsyncData *data;
1178   
1179   simple = G_SIMPLE_ASYNC_RESULT (result);
1180   
1181   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1182
1183   data = g_simple_async_result_get_op_res_gpointer (simple);
1184   
1185   return data->bytes_read;
1186 }
1187
1188 typedef struct {
1189   gssize bytes_skipped;
1190   gssize count;
1191 } SkipAsyncData;
1192
1193 static void 
1194 free_skip_async_data (gpointer _data)
1195 {
1196   SkipAsyncData *data = _data;
1197   g_slice_free (SkipAsyncData, data);
1198 }
1199
1200 static void
1201 large_skip_callback (GObject *source_object,
1202                      GAsyncResult *result,
1203                      gpointer user_data)
1204 {
1205   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1206   SkipAsyncData *data;
1207   GError *error;
1208   gssize nread;
1209
1210   data = g_simple_async_result_get_op_res_gpointer (simple);
1211   
1212   error = NULL;
1213   nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1214                                       result, &error);
1215
1216   /* Only report the error if we've not already read some data */
1217   if (nread < 0 && data->bytes_skipped == 0)
1218     g_simple_async_result_set_from_error (simple, error);
1219   
1220   if (nread > 0)
1221     data->bytes_skipped += nread;
1222   
1223   if (error)
1224     g_error_free (error);
1225   
1226   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1227   g_simple_async_result_complete (simple);
1228   g_object_unref (simple);
1229 }
1230
1231 static void
1232 skip_fill_buffer_callback (GObject *source_object,
1233                            GAsyncResult *result,
1234                            gpointer user_data)
1235 {
1236   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1237   GBufferedInputStream *bstream;
1238   GBufferedInputStreamPrivate *priv;
1239   SkipAsyncData *data;
1240   GError *error;
1241   gssize nread;
1242   gsize available;
1243
1244   bstream = G_BUFFERED_INPUT_STREAM (source_object);
1245   priv = bstream->priv;
1246   
1247   g_input_stream_set_pending (G_INPUT_STREAM (bstream), TRUE); /* enable again */
1248   
1249   data = g_simple_async_result_get_op_res_gpointer (simple);
1250   
1251   error = NULL;
1252   nread = g_buffered_input_stream_fill_finish (bstream,
1253                                                result, &error);
1254   
1255   if (nread < 0 && data->bytes_skipped == 0)
1256     g_simple_async_result_set_from_error (simple, error);
1257
1258
1259   if (nread > 0)
1260     {
1261       available = priv->end - priv->pos;
1262       data->count = MIN (data->count, available);
1263       
1264       data->bytes_skipped += data->count;
1265       priv->pos += data->count;
1266     }
1267
1268   if (error)
1269     g_error_free (error);
1270   
1271   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1272   g_simple_async_result_complete (simple);
1273   g_object_unref (simple);
1274 }
1275
1276 static void
1277 g_buffered_input_stream_skip_async (GInputStream              *stream,
1278                                     gsize                      count,
1279                                     int                        io_priority,
1280                                     GCancellable              *cancellable,
1281                                     GAsyncReadyCallback        callback,
1282                                     gpointer                   user_data)
1283 {
1284   GBufferedInputStream *bstream;
1285   GBufferedInputStreamPrivate *priv;
1286   GInputStream *base_stream;
1287   gsize available;
1288   GSimpleAsyncResult *simple;
1289   SkipAsyncData *data;
1290
1291   bstream = G_BUFFERED_INPUT_STREAM (stream);
1292   priv = bstream->priv;
1293
1294   data = g_slice_new (SkipAsyncData);
1295   data->bytes_skipped = 0;
1296   simple = g_simple_async_result_new (G_OBJECT (stream),
1297                                       callback, user_data,
1298                                       g_buffered_input_stream_skip_async);
1299   g_simple_async_result_set_op_res_gpointer (simple, data, free_skip_async_data);
1300   
1301   available = priv->end - priv->pos;
1302   
1303   if (count <= available)
1304     {
1305       priv->pos += count;
1306       data->bytes_skipped = count;
1307       
1308       g_simple_async_result_complete_in_idle (simple);
1309       g_object_unref (simple);
1310       return;
1311     }
1312
1313
1314   /* Full request not available, skip all currently availbile and request refill for more */
1315   
1316   priv->pos = 0;
1317   priv->end = 0;
1318   
1319   count -= available;
1320   
1321   data->bytes_skipped = available;
1322   data->count = count;
1323
1324   if (count > priv->len)
1325     {
1326       /* Large request, shortcut buffer */
1327       
1328       base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1329       
1330       g_input_stream_skip_async (base_stream,
1331                                  count,
1332                                  io_priority, cancellable,
1333                                  large_skip_callback,
1334                                  simple);
1335     }
1336   else
1337     {
1338       g_input_stream_set_pending (stream, FALSE); /* to avoid already pending error */
1339       g_buffered_input_stream_fill_async (bstream, priv->len,
1340                                           io_priority, cancellable,
1341                                           skip_fill_buffer_callback, simple);
1342     }
1343 }
1344
1345 static gssize
1346 g_buffered_input_stream_skip_finish (GInputStream   *stream,
1347                                      GAsyncResult   *result,
1348                                      GError        **error)
1349 {
1350   GSimpleAsyncResult *simple;
1351   SkipAsyncData *data;
1352   
1353   simple = G_SIMPLE_ASYNC_RESULT (result);
1354   
1355   g_assert (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_skip_async);
1356
1357   data = g_simple_async_result_get_op_res_gpointer (simple);
1358   
1359   return data->bytes_skipped;
1360 }
1361
1362 /* vim: ts=2 sw=2 et */