Bumps documentation to 93% symbol coverage, touching most
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include <config.h>
24 #include <glib.h>
25 #include "glibintl.h"
26
27 #include "ginputstream.h"
28 #include "gseekable.h"
29 #include "gsimpleasyncresult.h"
30
31 /**
32  * SECTION:ginputstream
33  * @short_description: base class for implementing streaming input
34  *
35  * 
36  * 
37  **/
38
39 G_DEFINE_TYPE (GInputStream, g_input_stream, G_TYPE_OBJECT);
40
41 struct _GInputStreamPrivate {
42   guint closed : 1;
43   guint pending : 1;
44   GAsyncReadyCallback outstanding_callback;
45 };
46
47 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
48                                                   gsize                 count,
49                                                   GCancellable         *cancellable,
50                                                   GError              **error);
51 static void     g_input_stream_real_read_async   (GInputStream         *stream,
52                                                   void                 *buffer,
53                                                   gsize                 count,
54                                                   int                   io_priority,
55                                                   GCancellable         *cancellable,
56                                                   GAsyncReadyCallback   callback,
57                                                   gpointer              user_data);
58 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
59                                                   GAsyncResult         *result,
60                                                   GError              **error);
61 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
62                                                   gsize                 count,
63                                                   int                   io_priority,
64                                                   GCancellable         *cancellable,
65                                                   GAsyncReadyCallback   callback,
66                                                   gpointer              data);
67 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
68                                                   GAsyncResult         *result,
69                                                   GError              **error);
70 static void     g_input_stream_real_close_async  (GInputStream         *stream,
71                                                   int                   io_priority,
72                                                   GCancellable         *cancellable,
73                                                   GAsyncReadyCallback   callback,
74                                                   gpointer              data);
75 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
76                                                   GAsyncResult         *result,
77                                                   GError              **error);
78
79 static void
80 g_input_stream_finalize (GObject *object)
81 {
82   GInputStream *stream;
83
84   stream = G_INPUT_STREAM (object);
85   
86   if (!stream->priv->closed)
87     g_input_stream_close (stream, NULL, NULL);
88
89   if (G_OBJECT_CLASS (g_input_stream_parent_class)->finalize)
90     (*G_OBJECT_CLASS (g_input_stream_parent_class)->finalize) (object);
91 }
92
93 static void
94 g_input_stream_dispose (GObject *object)
95 {
96   GInputStream *stream;
97
98   stream = G_INPUT_STREAM (object);
99   
100   if (!stream->priv->closed)
101     g_input_stream_close (stream, NULL, NULL);
102   
103   if (G_OBJECT_CLASS (g_input_stream_parent_class)->dispose)
104     (*G_OBJECT_CLASS (g_input_stream_parent_class)->dispose) (object);
105 }
106
107
108 static void
109 g_input_stream_class_init (GInputStreamClass *klass)
110 {
111   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
112   
113   g_type_class_add_private (klass, sizeof (GInputStreamPrivate));
114   
115   gobject_class->finalize = g_input_stream_finalize;
116   gobject_class->dispose = g_input_stream_dispose;
117   
118   klass->skip = g_input_stream_real_skip;
119   klass->read_async = g_input_stream_real_read_async;
120   klass->read_finish = g_input_stream_real_read_finish;
121   klass->skip_async = g_input_stream_real_skip_async;
122   klass->skip_finish = g_input_stream_real_skip_finish;
123   klass->close_async = g_input_stream_real_close_async;
124   klass->close_finish = g_input_stream_real_close_finish;
125 }
126
127 static void
128 g_input_stream_init (GInputStream *stream)
129 {
130   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
131                                               G_TYPE_INPUT_STREAM,
132                                               GInputStreamPrivate);
133 }
134
135 /**
136  * g_input_stream_read:
137  * @stream: a #GInputStream.
138  * @buffer: a buffer to read data into (which should be at least count bytes long).
139  * @count: the number of bytes that will be read from the stream
140  * @cancellable: optional #GCancellable object, %NULL to ignore.
141  * @error: location to store the error occuring, or %NULL to ignore
142  *
143  * Tries to read @count bytes from the stream into the buffer starting at
144  * @buffer. Will block during this read.
145  * 
146  * If count is zero returns zero and does nothing. A value of @count
147  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
148  *
149  * On success, the number of bytes read into the buffer is returned.
150  * It is not an error if this is not the same as the requested size, as it
151  * can happen e.g. near the end of a file. Zero is returned on end of file
152  * (or if @count is zero),  but never otherwise.
153  *
154  * If @cancellable is not NULL, then the operation can be cancelled by
155  * triggering the cancellable object from another thread. If the operation
156  * was cancelled, the error G_IO_ERROR_CANCELLED will be returned. If an
157  * operation was partially finished when the operation was cancelled the
158  * partial result will be returned, without an error.
159  *
160  * On error -1 is returned and @error is set accordingly.
161  * 
162  * Return value: Number of bytes read, or -1 on error
163  **/
164 gssize
165 g_input_stream_read  (GInputStream  *stream,
166                       void          *buffer,
167                       gsize          count,
168                       GCancellable  *cancellable,
169                       GError       **error)
170 {
171   GInputStreamClass *class;
172   gssize res;
173
174   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
175   g_return_val_if_fail (buffer != NULL, 0);
176
177   if (count == 0)
178     return 0;
179   
180   if (((gssize) count) < 0)
181     {
182       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
183                    _("Too large count value passed to g_input_stream_read"));
184       return -1;
185     }
186
187   if (stream->priv->closed)
188     {
189       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
190                    _("Stream is already closed"));
191       return -1;
192     }
193   
194   if (stream->priv->pending)
195     {
196       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
197                    _("Stream has outstanding operation"));
198       return -1;
199     }
200   
201   class = G_INPUT_STREAM_GET_CLASS (stream);
202
203   if (class->read == NULL) 
204     {
205       g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
206                    _("Input stream doesn't implement read"));
207       return -1;
208     }
209
210   if (cancellable)
211     g_push_current_cancellable (cancellable);
212   
213   stream->priv->pending = TRUE;
214   res = class->read (stream, buffer, count, cancellable, error);
215   stream->priv->pending = FALSE;
216
217   if (cancellable)
218     g_pop_current_cancellable (cancellable);
219   
220   return res;
221 }
222
223 /**
224  * g_input_stream_read_all:
225  * @stream: a #GInputStream.
226  * @buffer: a buffer to read data into (which should be at least count bytes long).
227  * @count: the number of bytes that will be read from the stream
228  * @bytes_read: location to store the number of bytes that was read from the stream
229  * @cancellable: optional #GCancellable object, %NULL to ignore.
230  * @error: location to store the error occuring, or %NULL to ignore
231  *
232  * Tries to read @count bytes from the stream into the buffer starting at
233  * @buffer. Will block during this read.
234  *
235  * This function is similar to g_input_stream_read(), except it tries to
236  * read as many bytes as requested, only stopping on an error or end of stream.
237  *
238  * On a successful read of @count bytes, or if we reached the end of the
239  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
240  * read into @buffer.
241  * 
242  * If there is an error during the operation %FALSE is returned and @error
243  * is set to indicate the error status, @bytes_read is updated to contain
244  * the number of bytes read into @buffer before the error occured.
245  *
246  * Return value: %TRUE on success, %FALSE if there was an error
247  **/
248 gboolean
249 g_input_stream_read_all (GInputStream              *stream,
250                          void                      *buffer,
251                          gsize                      count,
252                          gsize                     *bytes_read,
253                          GCancellable              *cancellable,
254                          GError                   **error)
255 {
256   gsize _bytes_read;
257   gssize res;
258
259   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
260   g_return_val_if_fail (buffer != NULL, FALSE);
261
262   _bytes_read = 0;
263   while (_bytes_read < count)
264     {
265       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
266                                  cancellable, error);
267       if (res == -1)
268         {
269           if (bytes_read)
270             *bytes_read = _bytes_read;
271           return FALSE;
272         }
273       
274       if (res == 0)
275         break;
276
277       _bytes_read += res;
278     }
279
280   if (bytes_read)
281     *bytes_read = _bytes_read;
282   return TRUE;
283 }
284
285 /**
286  * g_input_stream_skip:
287  * @stream: a #GInputStream.
288  * @count: the number of bytes that will be skipped from the stream
289  * @cancellable: optional #GCancellable object, %NULL to ignore. 
290  * @error: location to store the error occuring, or %NULL to ignore
291  *
292  * Tries to skip @count bytes from the stream. Will block during the operation.
293  *
294  * This is identical to g_input_stream_read(), from a behaviour standpoint,
295  * but the bytes that are skipped are not returned to the user. Some
296  * streams have an implementation that is more efficient than reading the data.
297  *
298  * This function is optional for inherited classes, as the default implementation
299  * emulates it using read.
300  *
301  * If @cancellable is not %NULL, then the operation can be cancelled by
302  * triggering the cancellable object from another thread. If the operation
303  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
304  * operation was partially finished when the operation was cancelled the
305  * partial result will be returned, without an error.
306  *
307  * Return value: Number of bytes skipped, or -1 on error
308  **/
309 gssize
310 g_input_stream_skip (GInputStream         *stream,
311                      gsize                 count,
312                      GCancellable         *cancellable,
313                      GError              **error)
314 {
315   GInputStreamClass *class;
316   gssize res;
317
318   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
319
320   if (count == 0)
321     return 0;
322
323   if (((gssize) count) < 0)
324     {
325       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
326                    _("Too large count value passed to g_input_stream_skip"));
327       return -1;
328     }
329   
330   if (stream->priv->closed)
331     {
332       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
333                    _("Stream is already closed"));
334       return -1;
335     }
336
337   if (stream->priv->pending)
338     {
339       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
340                    _("Stream has outstanding operation"));
341       return -1;
342     }
343   
344   class = G_INPUT_STREAM_GET_CLASS (stream);
345
346   if (cancellable)
347     g_push_current_cancellable (cancellable);
348   
349   stream->priv->pending = TRUE;
350   res = class->skip (stream, count, cancellable, error);
351   stream->priv->pending = FALSE;
352
353   if (cancellable)
354     g_pop_current_cancellable (cancellable);
355   
356   return res;
357 }
358
359 static gssize
360 g_input_stream_real_skip (GInputStream         *stream,
361                           gsize                 count,
362                           GCancellable         *cancellable,
363                           GError              **error)
364 {
365   GInputStreamClass *class;
366   gssize ret, read_bytes;
367   char buffer[8192];
368   GError *my_error;
369
370   class = G_INPUT_STREAM_GET_CLASS (stream);
371
372   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
373     {
374       if (g_seekable_seek (G_SEEKABLE (stream),
375                            count,
376                            G_SEEK_CUR,
377                            cancellable,
378                            NULL))
379         return count;
380     }
381
382   /* If not seekable, or seek failed, fall back to reading data: */
383
384   class = G_INPUT_STREAM_GET_CLASS (stream);
385   
386   read_bytes = 0;
387   while (1)
388     {
389       my_error = NULL;
390
391       ret = class->read (stream, buffer, MIN (sizeof (buffer), count),
392                          cancellable, &my_error);
393       if (ret == -1)
394         {
395           if (read_bytes > 0 &&
396               my_error->domain == G_IO_ERROR &&
397               my_error->code == G_IO_ERROR_CANCELLED)
398             {
399               g_error_free (my_error);
400               return read_bytes;
401             }
402           
403           g_propagate_error (error, my_error);
404           return -1;
405         }
406
407       count -= ret;
408       read_bytes += ret;
409       
410       if (ret == 0 || count == 0)
411         return read_bytes;
412     }
413 }
414
415 /**
416  * g_input_stream_close:
417  * @stream: A #GInputStream.
418  * @cancellable: optional #GCancellable object, %NULL to ignore.
419  * @error: location to store the error occuring, or %NULL to ignore
420  *
421  * Closes the stream, releasing resources related to it.
422  *
423  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
424  * Closing a stream multiple times will not return an error.
425  *
426  * Streams will be automatically closed when the last reference
427  * is dropped, but you might want to call make sure resources
428  * are released as early as possible.
429  *
430  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
431  * open after the stream is closed. See the documentation for the individual
432  * stream for details.
433  *
434  * On failure the first error that happened will be reported, but the close
435  * operation will finish as much as possible. A stream that failed to
436  * close will still return %G_IO_ERROR_CLOSED all operations. Still, it
437  * is important to check and report the error to the user.
438  *
439  * If @cancellable is not NULL, then the operation can be cancelled by
440  * triggering the cancellable object from another thread. If the operation
441  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
442  * Cancelling a close will still leave the stream closed, but some streams
443  * can use a faster close that doesn't block to e.g. check errors. 
444  *
445  * Return value: %TRUE on success, %FALSE on failure
446  **/
447 gboolean
448 g_input_stream_close (GInputStream  *stream,
449                       GCancellable  *cancellable,
450                       GError       **error)
451 {
452   GInputStreamClass *class;
453   gboolean res;
454
455   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
456
457   class = G_INPUT_STREAM_GET_CLASS (stream);
458
459   if (stream->priv->closed)
460     return TRUE;
461
462   if (stream->priv->pending)
463     {
464       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
465                    _("Stream has outstanding operation"));
466       return FALSE;
467     }
468   
469   res = TRUE;
470
471   stream->priv->pending = TRUE;
472
473   if (cancellable)
474     g_push_current_cancellable (cancellable);
475
476   if (class->close)
477     res = class->close (stream, cancellable, error);
478
479   if (cancellable)
480     g_pop_current_cancellable (cancellable);
481   
482   stream->priv->closed = TRUE;
483   
484   stream->priv->pending = FALSE;
485
486   return res;
487 }
488
489 static void
490 async_ready_callback_wrapper (GObject *source_object,
491                               GAsyncResult *res,
492                               gpointer      user_data)
493 {
494   GInputStream *stream = G_INPUT_STREAM (source_object);
495
496   stream->priv->pending = FALSE;
497   if (stream->priv->outstanding_callback)
498     (*stream->priv->outstanding_callback) (source_object, res, user_data);
499   g_object_unref (stream);
500 }
501
502 static void
503 async_ready_close_callback_wrapper (GObject *source_object,
504                                     GAsyncResult *res,
505                                     gpointer      user_data)
506 {
507   GInputStream *stream = G_INPUT_STREAM (source_object);
508
509   stream->priv->pending = FALSE;
510   stream->priv->closed = TRUE;
511   if (stream->priv->outstanding_callback)
512     (*stream->priv->outstanding_callback) (source_object, res, user_data);
513   g_object_unref (stream);
514 }
515
516 /**
517  * g_input_stream_read_async:
518  * @stream: A #GInputStream.
519  * @buffer: a buffer to read data into (which should be at least count bytes long).
520  * @count: the number of bytes that will be read from the stream
521  * @io_priority: the io priority of the request. the io priority of the request
522  * @cancellable: optional #GCancellable object, %NULL to ignore.
523  * @callback: callback to call when the request is satisfied
524  * @user_data: the data to pass to callback function
525  *
526  * Request an asynchronous read of @count bytes from the stream into the buffer
527  * starting at @buffer. When the operation is finished @callback will be called,
528  * giving the results.
529  *
530  * During an async request no other sync and async calls are allowed, and will
531  * result in %G_IO_ERROR_PENDING errors. 
532  *
533  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
534  *
535  * On success, the number of bytes read into the buffer will be passed to the
536  * callback. It is not an error if this is not the same as the requested size, as it
537  * can happen e.g. near the end of a file, but generally we try to read
538  * as many bytes as requested. Zero is returned on end of file
539  * (or if @count is zero),  but never otherwise.
540  *
541  * Any outstanding i/o request with higher priority (lower numerical value) will
542  * be executed before an outstanding request with lower priority. Default
543  * priority is %G_PRIORITY_DEFAULT.
544  *
545  * The asyncronous methods have a default fallback that uses threads to implement
546  * asynchronicity, so they are optional for inheriting classes. However, if you
547  * override one you must override all.
548  **/
549 void
550 g_input_stream_read_async (GInputStream        *stream,
551                            void                *buffer,
552                            gsize                count,
553                            int                  io_priority,
554                            GCancellable        *cancellable,
555                            GAsyncReadyCallback  callback,
556                            gpointer             user_data)
557 {
558   GInputStreamClass *class;
559   GSimpleAsyncResult *simple;
560
561   g_return_if_fail (G_IS_INPUT_STREAM (stream));
562   g_return_if_fail (buffer != NULL);
563
564   if (count == 0)
565     {
566       simple = g_simple_async_result_new (G_OBJECT (stream),
567                                           callback,
568                                           user_data,
569                                           g_input_stream_read_async);
570       g_simple_async_result_complete_in_idle (simple);
571       g_object_unref (simple);
572       return;
573     }
574   
575   if (((gssize) count) < 0)
576     {
577       g_simple_async_report_error_in_idle (G_OBJECT (stream),
578                                            callback,
579                                            user_data,
580                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
581                                            _("Too large count value passed to g_input_stream_read_async"));
582       return;
583     }
584
585   if (stream->priv->closed)
586     {
587       g_simple_async_report_error_in_idle (G_OBJECT (stream),
588                                            callback,
589                                            user_data,
590                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
591                                            _("Stream is already closed"));
592       return;
593     }
594   
595   if (stream->priv->pending)
596     {
597       g_simple_async_report_error_in_idle (G_OBJECT (stream),
598                                            callback,
599                                            user_data,
600                                            G_IO_ERROR, G_IO_ERROR_PENDING,
601                                            _("Stream has outstanding operation"));
602       return;
603     }
604
605   class = G_INPUT_STREAM_GET_CLASS (stream);
606   
607   stream->priv->pending = TRUE;
608   stream->priv->outstanding_callback = callback;
609   g_object_ref (stream);
610   class->read_async (stream, buffer, count, io_priority, cancellable,
611                      async_ready_callback_wrapper, user_data);
612 }
613
614 /**
615  * g_input_stream_read_finish:
616  * @stream: a #GInputStream.
617  * @result: a #GAsyncResult.
618  * @error: a #GError location to store the error occuring, or %NULL to 
619  * ignore.
620  * 
621  * Finishes an asynchronous stream read operation. 
622  * 
623  * Returns: number of bytes read in, or -1 on error.
624  **/
625 gssize
626 g_input_stream_read_finish (GInputStream              *stream,
627                             GAsyncResult              *result,
628                             GError                   **error)
629 {
630   GSimpleAsyncResult *simple;
631   GInputStreamClass *class;
632   
633   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
634   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
635
636   if (G_IS_SIMPLE_ASYNC_RESULT (result))
637     {
638       simple = G_SIMPLE_ASYNC_RESULT (result);
639       if (g_simple_async_result_propagate_error (simple, error))
640         return -1;
641
642       /* Special case read of 0 bytes */
643       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_read_async)
644         return 0;
645     }
646
647   class = G_INPUT_STREAM_GET_CLASS (stream);
648   return class->read_finish (stream, result, error);
649 }
650
651 /**
652  * g_input_stream_skip_async:
653  * @stream: A #GInputStream.
654  * @count: the number of bytes that will be skipped from the stream
655  * @io_priority: the io priority of the request. the io priority of the request
656  * @cancellable: optional #GCancellable object, %NULL to ignore. 
657  * @callback: callback to call when the request is satisfied
658  * @user_data: the data to pass to callback function
659  *
660  * Request an asynchronous skip of @count bytes from the stream into the buffer
661  * starting at @buffer. When the operation is finished @callback will be called,
662  * giving the results.
663  *
664  * During an async request no other sync and async calls are allowed, and will
665  * result in %G_IO_ERROR_PENDING errors. 
666  *
667  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
668  *
669  * On success, the number of bytes skipped will be passed to the
670  * callback. It is not an error if this is not the same as the requested size, as it
671  * can happen e.g. near the end of a file, but generally we try to skip
672  * as many bytes as requested. Zero is returned on end of file
673  * (or if @count is zero), but never otherwise.
674  *
675  * Any outstanding i/o request with higher priority (lower numerical value) will
676  * be executed before an outstanding request with lower priority. Default
677  * priority is %G_PRIORITY_DEFAULT.
678  *
679  * The asyncronous methods have a default fallback that uses threads to implement
680  * asynchronicity, so they are optional for inheriting classes. However, if you
681  * override one you must override all.
682  **/
683 void
684 g_input_stream_skip_async (GInputStream        *stream,
685                            gsize                count,
686                            int                  io_priority,
687                            GCancellable        *cancellable,
688                            GAsyncReadyCallback  callback,
689                            gpointer             user_data)
690 {
691   GInputStreamClass *class;
692   GSimpleAsyncResult *simple;
693
694   g_return_if_fail (G_IS_INPUT_STREAM (stream));
695
696   if (count == 0)
697     {
698       simple = g_simple_async_result_new (G_OBJECT (stream),
699                                           callback,
700                                           user_data,
701                                           g_input_stream_skip_async);
702
703       g_simple_async_result_complete_in_idle (simple);
704       g_object_unref (simple);
705       return;
706     }
707   
708   if (((gssize) count) < 0)
709     {
710       g_simple_async_report_error_in_idle (G_OBJECT (stream),
711                                            callback,
712                                            user_data,
713                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
714                                            _("Too large count value passed to g_input_stream_skip_async"));
715       return;
716     }
717
718   if (stream->priv->closed)
719     {
720       g_simple_async_report_error_in_idle (G_OBJECT (stream),
721                                            callback,
722                                            user_data,
723                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
724                                            _("Stream is already closed"));
725       return;
726     }
727   
728   if (stream->priv->pending)
729     {
730       g_simple_async_report_error_in_idle (G_OBJECT (stream),
731                                            callback,
732                                            user_data,
733                                            G_IO_ERROR, G_IO_ERROR_PENDING,
734                                            _("Stream has outstanding operation"));
735       return;
736     }
737
738   class = G_INPUT_STREAM_GET_CLASS (stream);
739   stream->priv->pending = TRUE;
740   stream->priv->outstanding_callback = callback;
741   g_object_ref (stream);
742   class->skip_async (stream, count, io_priority, cancellable,
743                      async_ready_callback_wrapper, user_data);
744 }
745
746 /**
747  * g_input_stream_skip_finish:
748  * @stream: a #GInputStream.
749  * @result: a #GAsyncResult.
750  * @error: a #GError location to store the error occuring, or %NULL to 
751  * ignore.
752  * 
753  * Finishes a stream skip operation.
754  * 
755  * Returns: the size of the bytes skipped, or %-1 on error.
756  **/
757 gssize
758 g_input_stream_skip_finish (GInputStream              *stream,
759                             GAsyncResult              *result,
760                             GError                   **error)
761 {
762   GSimpleAsyncResult *simple;
763   GInputStreamClass *class;
764
765   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
766   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
767
768   if (G_IS_SIMPLE_ASYNC_RESULT (result))
769     {
770       simple = G_SIMPLE_ASYNC_RESULT (result);
771       if (g_simple_async_result_propagate_error (simple, error))
772         return -1;
773
774       /* Special case skip of 0 bytes */
775       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_skip_async)
776         return 0;
777     }
778
779   class = G_INPUT_STREAM_GET_CLASS (stream);
780   return class->skip_finish (stream, result, error);
781 }
782
783 /**
784  * g_input_stream_close_async:
785  * @stream: A #GInputStream.
786  * @io_priority: the io priority of the request. the io priority of the request
787  * @cancellable: optional cancellable object
788  * @callback: callback to call when the request is satisfied
789  * @user_data: the data to pass to callback function
790  *
791  * Requests an asynchronous closes of the stream, releasing resources related to it.
792  * When the operation is finished @callback will be called, giving the results.
793  *
794  * For behaviour details see g_input_stream_close().
795  *
796  * The asyncronous methods have a default fallback that uses threads to implement
797  * asynchronicity, so they are optional for inheriting classes. However, if you
798  * override one you must override all.
799  **/
800 void
801 g_input_stream_close_async (GInputStream       *stream,
802                             int                 io_priority,
803                             GCancellable       *cancellable,
804                             GAsyncReadyCallback callback,
805                             gpointer            user_data)
806 {
807   GInputStreamClass *class;
808   GSimpleAsyncResult *simple;
809
810   g_return_if_fail (G_IS_INPUT_STREAM (stream));
811
812   if (stream->priv->closed)
813     {
814       simple = g_simple_async_result_new (G_OBJECT (stream),
815                                           callback,
816                                           user_data,
817                                           g_input_stream_close_async);
818
819       g_simple_async_result_complete_in_idle (simple);
820       g_object_unref (simple);
821       return;
822     }
823
824   if (stream->priv->pending)
825     {
826       g_simple_async_report_error_in_idle (G_OBJECT (stream),
827                                            callback,
828                                            user_data,
829                                            G_IO_ERROR, G_IO_ERROR_PENDING,
830                                            _("Stream has outstanding operation"));
831       return;
832     }
833   
834   class = G_INPUT_STREAM_GET_CLASS (stream);
835   stream->priv->pending = TRUE;
836   stream->priv->outstanding_callback = callback;
837   g_object_ref (stream);
838   class->close_async (stream, io_priority, cancellable,
839                       async_ready_close_callback_wrapper, user_data);
840 }
841
842 /**
843  * g_input_stream_close_finish:
844  * @stream: a #GInputStream.
845  * @result: a #GAsyncResult.
846  * @error: a #GError location to store the error occuring, or %NULL to 
847  * ignore.
848  * 
849  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
850  * 
851  * Returns: %TRUE if the stream was closed successfully.
852  **/
853 gboolean
854 g_input_stream_close_finish (GInputStream              *stream,
855                              GAsyncResult              *result,
856                              GError                   **error)
857 {
858   GSimpleAsyncResult *simple;
859   GInputStreamClass *class;
860
861   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
862   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
863
864   if (G_IS_SIMPLE_ASYNC_RESULT (result))
865     {
866       simple = G_SIMPLE_ASYNC_RESULT (result);
867       if (g_simple_async_result_propagate_error (simple, error))
868         return FALSE;
869
870       /* Special case already closed */
871       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_close_async)
872         return TRUE;
873     }
874
875   class = G_INPUT_STREAM_GET_CLASS (stream);
876   return class->close_finish (stream, result, error);
877 }
878
879 /**
880  * g_input_stream_is_closed:
881  * @stream: input stream.
882  * 
883  * Checks if an input stream is closed.
884  * 
885  * Returns: %TRUE if the stream is closed.
886  **/
887 gboolean
888 g_input_stream_is_closed (GInputStream *stream)
889 {
890   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
891   
892   return stream->priv->closed;
893 }
894  
895 /**
896  * g_input_stream_has_pending:
897  * @stream: input stream.
898  * 
899  * Checks if an input stream has pending actions.
900  * 
901  * Returns: %TRUE if @stream has pending actions.
902  **/  
903 gboolean
904 g_input_stream_has_pending (GInputStream *stream)
905 {
906   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
907   
908   return stream->priv->pending;
909 }
910
911 /**
912  * g_input_stream_set_pending:
913  * @stream: input stream
914  * @pending: boolean.
915  * 
916  * Sets @stream has actions pending.
917  **/
918 void
919 g_input_stream_set_pending (GInputStream              *stream,
920                             gboolean                   pending)
921 {
922   g_return_if_fail (G_IS_INPUT_STREAM (stream));
923   
924   stream->priv->pending = pending;
925 }
926
927 /********************************************
928  *   Default implementation of async ops    *
929  ********************************************/
930
931 typedef struct {
932   void              *buffer;
933   gsize              count_requested;
934   gssize             count_read;
935 } ReadData;
936
937 static void
938 read_async_thread (GSimpleAsyncResult *res,
939                    GObject *object,
940                    GCancellable *cancellable)
941 {
942   ReadData *op;
943   GInputStreamClass *class;
944   GError *error = NULL;
945  
946   op = g_simple_async_result_get_op_res_gpointer (res);
947
948   class = G_INPUT_STREAM_GET_CLASS (object);
949
950   op->count_read = class->read (G_INPUT_STREAM (object),
951                                 op->buffer, op->count_requested,
952                                 cancellable, &error);
953   if (op->count_read == -1)
954     {
955       g_simple_async_result_set_from_error (res, error);
956       g_error_free (error);
957     }
958 }
959
960 static void
961 g_input_stream_real_read_async (GInputStream *stream,
962                                 void *buffer,
963                                 gsize count,
964                                 int io_priority,
965                                 GCancellable *cancellable,
966                                 GAsyncReadyCallback callback,
967                                 gpointer user_data)
968 {
969   GSimpleAsyncResult *res;
970   ReadData *op;
971   
972   op = g_new (ReadData, 1);
973   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_input_stream_real_read_async);
974   g_simple_async_result_set_op_res_gpointer (res, op, g_free);
975   op->buffer = buffer;
976   op->count_requested = count;
977   
978   g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
979   g_object_unref (res);
980 }
981
982 static gssize
983 g_input_stream_real_read_finish (GInputStream              *stream,
984                                  GAsyncResult              *result,
985                                  GError                   **error)
986 {
987   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
988   ReadData *op;
989
990   g_assert (g_simple_async_result_get_source_tag (simple) == 
991             g_input_stream_real_read_async);
992
993   op = g_simple_async_result_get_op_res_gpointer (simple);
994
995   return op->count_read;
996 }
997
998 typedef struct {
999   gsize count_requested;
1000   gssize count_skipped;
1001 } SkipData;
1002
1003
1004 static void
1005 skip_async_thread (GSimpleAsyncResult *res,
1006                    GObject *object,
1007                    GCancellable *cancellable)
1008 {
1009   SkipData *op;
1010   GInputStreamClass *class;
1011   GError *error = NULL;
1012   
1013   class = G_INPUT_STREAM_GET_CLASS (object);
1014   op = g_simple_async_result_get_op_res_gpointer (res);
1015   op->count_skipped = class->skip (G_INPUT_STREAM (object),
1016                                    op->count_requested,
1017                                    cancellable, &error);
1018   if (op->count_skipped == -1)
1019     {
1020       g_simple_async_result_set_from_error (res, error);
1021       g_error_free (error);
1022     }
1023 }
1024
1025 typedef struct {
1026   char buffer[8192];
1027   gsize count;
1028   gsize count_skipped;
1029   int io_prio;
1030   GCancellable *cancellable;
1031   gpointer user_data;
1032   GAsyncReadyCallback callback;
1033 } SkipFallbackAsyncData;
1034
1035 static void
1036 skip_callback_wrapper (GObject *source_object,
1037                        GAsyncResult *res,
1038                        gpointer user_data)
1039 {
1040   GInputStreamClass *class;
1041   SkipFallbackAsyncData *data = user_data;
1042   SkipData *op;
1043   GSimpleAsyncResult *simple;
1044   GError *error = NULL;
1045   gssize ret;
1046
1047   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1048
1049   if (ret > 0)
1050     {
1051       data->count -= ret;
1052       data->count_skipped += ret;
1053
1054       if (data->count > 0)
1055         {
1056           class = G_INPUT_STREAM_GET_CLASS (source_object);
1057           class->read_async (G_INPUT_STREAM (source_object), data->buffer, MIN (8192, data->count), data->io_prio, data->cancellable,
1058                              skip_callback_wrapper, data);
1059           return;
1060         }
1061     }
1062
1063   op = g_new0 (SkipData, 1);
1064   op->count_skipped = data->count_skipped;
1065   simple = g_simple_async_result_new (source_object,
1066                                       data->callback, data->user_data,
1067                                       g_input_stream_real_skip_async);
1068
1069   g_simple_async_result_set_op_res_gpointer (simple, op, g_free);
1070
1071   if (ret == -1)
1072     {
1073       if (data->count_skipped && 
1074           error->domain == G_IO_ERROR &&
1075           error->code == G_IO_ERROR_CANCELLED)
1076         { /* No error, return partial read */ }
1077       else
1078         g_simple_async_result_set_from_error (simple, error);
1079       g_error_free (error);
1080     }
1081
1082   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1083   g_simple_async_result_complete (simple);
1084   g_object_unref (simple);
1085   
1086   g_free (data);
1087  }
1088
1089 static void
1090 g_input_stream_real_skip_async (GInputStream        *stream,
1091                                 gsize                count,
1092                                 int                  io_priority,
1093                                 GCancellable        *cancellable,
1094                                 GAsyncReadyCallback  callback,
1095                                 gpointer             user_data)
1096 {
1097   GInputStreamClass *class;
1098   SkipData *op;
1099   SkipFallbackAsyncData *data;
1100   GSimpleAsyncResult *res;
1101
1102   class = G_INPUT_STREAM_GET_CLASS (stream);
1103
1104   if (class->read_async == g_input_stream_real_read_async)
1105     {
1106       /* Read is thread-using async fallback.
1107        * Make skip use threads too, so that we can use a possible sync skip
1108        * implementation. */
1109       op = g_new0 (SkipData, 1);
1110       
1111       res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
1112                                        g_input_stream_real_skip_async);
1113
1114       g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1115
1116       op->count_requested = count;
1117
1118       g_simple_async_result_run_in_thread (res, skip_async_thread, io_priority, cancellable);
1119       g_object_unref (res);
1120     }
1121   else
1122     {
1123       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1124       
1125       /* There is a custom async read function, lets use that. */
1126       data = g_new (SkipFallbackAsyncData, 1);
1127       data->count = count;
1128       data->count_skipped = 0;
1129       data->io_prio = io_priority;
1130       data->cancellable = cancellable;
1131       data->callback = callback;
1132       data->user_data = user_data;
1133       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1134                          skip_callback_wrapper, data);
1135     }
1136
1137 }
1138
1139 static gssize
1140 g_input_stream_real_skip_finish (GInputStream              *stream,
1141                                  GAsyncResult              *result,
1142                                  GError                   **error)
1143 {
1144   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1145   SkipData *op;
1146
1147   g_assert (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_skip_async);
1148   op = g_simple_async_result_get_op_res_gpointer (simple);
1149   return op->count_skipped;
1150 }
1151
1152 static void
1153 close_async_thread (GSimpleAsyncResult *res,
1154                     GObject *object,
1155                     GCancellable *cancellable)
1156 {
1157   GInputStreamClass *class;
1158   GError *error = NULL;
1159   gboolean result;
1160
1161   /* Auto handling of cancelation disabled, and ignore
1162      cancellation, since we want to close things anyway, although
1163      possibly in a quick-n-dirty way. At least we never want to leak
1164      open handles */
1165   
1166   class = G_INPUT_STREAM_GET_CLASS (object);
1167   result = class->close (G_INPUT_STREAM (object), cancellable, &error);
1168   if (!result)
1169     {
1170       g_simple_async_result_set_from_error (res, error);
1171       g_error_free (error);
1172     }
1173 }
1174
1175 static void
1176 g_input_stream_real_close_async (GInputStream        *stream,
1177                                  int                  io_priority,
1178                                  GCancellable        *cancellable,
1179                                  GAsyncReadyCallback  callback,
1180                                  gpointer             user_data)
1181 {
1182   GSimpleAsyncResult *res;
1183   
1184   res = g_simple_async_result_new (G_OBJECT (stream),
1185                                    callback,
1186                                    user_data,
1187                                    g_input_stream_real_close_async);
1188
1189   g_simple_async_result_set_handle_cancellation (res, FALSE);
1190   
1191   g_simple_async_result_run_in_thread (res,
1192                                        close_async_thread,
1193                                        io_priority,
1194                                        cancellable);
1195   g_object_unref (res);
1196 }
1197
1198 static gboolean
1199 g_input_stream_real_close_finish (GInputStream              *stream,
1200                                   GAsyncResult              *result,
1201                                   GError                   **error)
1202 {
1203   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1204   g_assert (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_close_async);
1205   return TRUE;
1206 }