gio/ docs/reference/gio Merged gio-standalone into glib.
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include <config.h>
24 #include <glib.h>
25 #include "glibintl.h"
26
27 #include "ginputstream.h"
28 #include "gseekable.h"
29 #include "gsimpleasyncresult.h"
30
31 G_DEFINE_TYPE (GInputStream, g_input_stream, G_TYPE_OBJECT);
32
33 struct _GInputStreamPrivate {
34   guint closed : 1;
35   guint pending : 1;
36   GAsyncReadyCallback outstanding_callback;
37 };
38
39 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
40                                                   gsize                 count,
41                                                   GCancellable         *cancellable,
42                                                   GError              **error);
43 static void     g_input_stream_real_read_async   (GInputStream         *stream,
44                                                   void                 *buffer,
45                                                   gsize                 count,
46                                                   int                   io_priority,
47                                                   GCancellable         *cancellable,
48                                                   GAsyncReadyCallback   callback,
49                                                   gpointer              user_data);
50 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
51                                                   GAsyncResult         *result,
52                                                   GError              **error);
53 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
54                                                   gsize                 count,
55                                                   int                   io_priority,
56                                                   GCancellable         *cancellable,
57                                                   GAsyncReadyCallback   callback,
58                                                   gpointer              data);
59 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
60                                                   GAsyncResult         *result,
61                                                   GError              **error);
62 static void     g_input_stream_real_close_async  (GInputStream         *stream,
63                                                   int                   io_priority,
64                                                   GCancellable         *cancellable,
65                                                   GAsyncReadyCallback   callback,
66                                                   gpointer              data);
67 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
68                                                   GAsyncResult         *result,
69                                                   GError              **error);
70
71 static void
72 g_input_stream_finalize (GObject *object)
73 {
74   GInputStream *stream;
75
76   stream = G_INPUT_STREAM (object);
77   
78   if (!stream->priv->closed)
79     g_input_stream_close (stream, NULL, NULL);
80
81   if (G_OBJECT_CLASS (g_input_stream_parent_class)->finalize)
82     (*G_OBJECT_CLASS (g_input_stream_parent_class)->finalize) (object);
83 }
84
85 static void
86 g_input_stream_dispose (GObject *object)
87 {
88   GInputStream *stream;
89
90   stream = G_INPUT_STREAM (object);
91   
92   if (!stream->priv->closed)
93     g_input_stream_close (stream, NULL, NULL);
94   
95   if (G_OBJECT_CLASS (g_input_stream_parent_class)->dispose)
96     (*G_OBJECT_CLASS (g_input_stream_parent_class)->dispose) (object);
97 }
98
99
100 static void
101 g_input_stream_class_init (GInputStreamClass *klass)
102 {
103   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
104   
105   g_type_class_add_private (klass, sizeof (GInputStreamPrivate));
106   
107   gobject_class->finalize = g_input_stream_finalize;
108   gobject_class->dispose = g_input_stream_dispose;
109   
110   klass->skip = g_input_stream_real_skip;
111   klass->read_async = g_input_stream_real_read_async;
112   klass->read_finish = g_input_stream_real_read_finish;
113   klass->skip_async = g_input_stream_real_skip_async;
114   klass->skip_finish = g_input_stream_real_skip_finish;
115   klass->close_async = g_input_stream_real_close_async;
116   klass->close_finish = g_input_stream_real_close_finish;
117 }
118
119 static void
120 g_input_stream_init (GInputStream *stream)
121 {
122   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
123                                               G_TYPE_INPUT_STREAM,
124                                               GInputStreamPrivate);
125 }
126
127 /**
128  * g_input_stream_read:
129  * @stream: a #GInputStream.
130  * @buffer: a buffer to read data into (which should be at least count bytes long).
131  * @count: the number of bytes that will be read from the stream
132  * @cancellable: optional #GCancellable object, %NULL to ignore.
133  * @error: location to store the error occuring, or %NULL to ignore
134  *
135  * Tries to read @count bytes from the stream into the buffer starting at
136  * @buffer. Will block during this read.
137  * 
138  * If count is zero returns zero and does nothing. A value of @count
139  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
140  *
141  * On success, the number of bytes read into the buffer is returned.
142  * It is not an error if this is not the same as the requested size, as it
143  * can happen e.g. near the end of a file. Zero is returned on end of file
144  * (or if @count is zero),  but never otherwise.
145  *
146  * If @cancellable is not NULL, then the operation can be cancelled by
147  * triggering the cancellable object from another thread. If the operation
148  * was cancelled, the error G_IO_ERROR_CANCELLED will be returned. If an
149  * operation was partially finished when the operation was cancelled the
150  * partial result will be returned, without an error.
151  *
152  * On error -1 is returned and @error is set accordingly.
153  * 
154  * Return value: Number of bytes read, or -1 on error
155  **/
156 gssize
157 g_input_stream_read  (GInputStream  *stream,
158                       void          *buffer,
159                       gsize          count,
160                       GCancellable  *cancellable,
161                       GError       **error)
162 {
163   GInputStreamClass *class;
164   gssize res;
165
166   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
167   g_return_val_if_fail (buffer != NULL, 0);
168
169   if (count == 0)
170     return 0;
171   
172   if (((gssize) count) < 0)
173     {
174       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
175                    _("Too large count value passed to g_input_stream_read"));
176       return -1;
177     }
178
179   if (stream->priv->closed)
180     {
181       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
182                    _("Stream is already closed"));
183       return -1;
184     }
185   
186   if (stream->priv->pending)
187     {
188       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
189                    _("Stream has outstanding operation"));
190       return -1;
191     }
192   
193   class = G_INPUT_STREAM_GET_CLASS (stream);
194
195   if (class->read == NULL) 
196     {
197       g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
198                    _("Input stream doesn't implement read"));
199       return -1;
200     }
201
202   if (cancellable)
203     g_push_current_cancellable (cancellable);
204   
205   stream->priv->pending = TRUE;
206   res = class->read (stream, buffer, count, cancellable, error);
207   stream->priv->pending = FALSE;
208
209   if (cancellable)
210     g_pop_current_cancellable (cancellable);
211   
212   return res;
213 }
214
215 /**
216  * g_input_stream_read_all:
217  * @stream: a #GInputStream.
218  * @buffer: a buffer to read data into (which should be at least count bytes long).
219  * @count: the number of bytes that will be read from the stream
220  * @bytes_read: location to store the number of bytes that was read from the stream
221  * @cancellable: optional #GCancellable object, %NULL to ignore.
222  * @error: location to store the error occuring, or %NULL to ignore
223  *
224  * Tries to read @count bytes from the stream into the buffer starting at
225  * @buffer. Will block during this read.
226  *
227  * This function is similar to g_input_stream_read(), except it tries to
228  * read as many bytes as requested, only stopping on an error or end of stream.
229  *
230  * On a successful read of @count bytes, or if we reached the end of the
231  * stream,  TRUE is returned, and @bytes_read is set to the number of bytes
232  * read into @buffer.
233  * 
234  * If there is an error during the operation FALSE is returned and @error
235  * is set to indicate the error status, @bytes_read is updated to contain
236  * the number of bytes read into @buffer before the error occured.
237  *
238  * Return value: TRUE on success, FALSE if there was an error
239  **/
240 gboolean
241 g_input_stream_read_all (GInputStream              *stream,
242                          void                      *buffer,
243                          gsize                      count,
244                          gsize                     *bytes_read,
245                          GCancellable              *cancellable,
246                          GError                   **error)
247 {
248   gsize _bytes_read;
249   gssize res;
250
251   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
252   g_return_val_if_fail (buffer != NULL, FALSE);
253
254   _bytes_read = 0;
255   while (_bytes_read < count)
256     {
257       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
258                                  cancellable, error);
259       if (res == -1)
260         {
261           if (bytes_read)
262             *bytes_read = _bytes_read;
263           return FALSE;
264         }
265       
266       if (res == 0)
267         break;
268
269       _bytes_read += res;
270     }
271
272   if (bytes_read)
273     *bytes_read = _bytes_read;
274   return TRUE;
275 }
276
277 /**
278  * g_input_stream_skip:
279  * @stream: a #GInputStream.
280  * @count: the number of bytes that will be skipped from the stream
281  * @cancellable: optional #GCancellable object, %NULL to ignore. 
282  * @error: location to store the error occuring, or %NULL to ignore
283  *
284  * Tries to skip @count bytes from the stream. Will block during the operation.
285  *
286  * This is identical to g_input_stream_read(), from a behaviour standpoint,
287  * but the bytes that are skipped are not returned to the user. Some
288  * streams have an implementation that is more efficient than reading the data.
289  *
290  * This function is optional for inherited classes, as the default implementation
291  * emulates it using read.
292  *
293  * If @cancellable is not %NULL, then the operation can be cancelled by
294  * triggering the cancellable object from another thread. If the operation
295  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
296  * operation was partially finished when the operation was cancelled the
297  * partial result will be returned, without an error.
298  *
299  * Return value: Number of bytes skipped, or -1 on error
300  **/
301 gssize
302 g_input_stream_skip (GInputStream         *stream,
303                      gsize                 count,
304                      GCancellable         *cancellable,
305                      GError              **error)
306 {
307   GInputStreamClass *class;
308   gssize res;
309
310   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
311
312   if (count == 0)
313     return 0;
314
315   if (((gssize) count) < 0)
316     {
317       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
318                    _("Too large count value passed to g_input_stream_skip"));
319       return -1;
320     }
321   
322   if (stream->priv->closed)
323     {
324       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
325                    _("Stream is already closed"));
326       return -1;
327     }
328
329   if (stream->priv->pending)
330     {
331       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
332                    _("Stream has outstanding operation"));
333       return -1;
334     }
335   
336   class = G_INPUT_STREAM_GET_CLASS (stream);
337
338   if (cancellable)
339     g_push_current_cancellable (cancellable);
340   
341   stream->priv->pending = TRUE;
342   res = class->skip (stream, count, cancellable, error);
343   stream->priv->pending = FALSE;
344
345   if (cancellable)
346     g_pop_current_cancellable (cancellable);
347   
348   return res;
349 }
350
351 static gssize
352 g_input_stream_real_skip (GInputStream         *stream,
353                           gsize                 count,
354                           GCancellable         *cancellable,
355                           GError              **error)
356 {
357   GInputStreamClass *class;
358   gssize ret, read_bytes;
359   char buffer[8192];
360   GError *my_error;
361
362   class = G_INPUT_STREAM_GET_CLASS (stream);
363
364   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
365     {
366       if (g_seekable_seek (G_SEEKABLE (stream),
367                            count,
368                            G_SEEK_CUR,
369                            cancellable,
370                            NULL))
371         return count;
372     }
373
374   /* If not seekable, or seek failed, fall back to reading data: */
375
376   class = G_INPUT_STREAM_GET_CLASS (stream);
377   
378   read_bytes = 0;
379   while (1)
380     {
381       my_error = NULL;
382
383       ret = class->read (stream, buffer, MIN (sizeof (buffer), count),
384                          cancellable, &my_error);
385       if (ret == -1)
386         {
387           if (read_bytes > 0 &&
388               my_error->domain == G_IO_ERROR &&
389               my_error->code == G_IO_ERROR_CANCELLED)
390             {
391               g_error_free (my_error);
392               return read_bytes;
393             }
394           
395           g_propagate_error (error, my_error);
396           return -1;
397         }
398
399       count -= ret;
400       read_bytes += ret;
401       
402       if (ret == 0 || count == 0)
403         return read_bytes;
404     }
405 }
406
407 /**
408  * g_input_stream_close:
409  * @stream: A #GInputStream.
410  * @error: location to store the error occuring, or %NULL to ignore
411  *
412  * Closes the stream, releasing resources related to it.
413  *
414  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
415  * Closing a stream multiple times will not return an error.
416  *
417  * Streams will be automatically closed when the last reference
418  * is dropped, but you might want to call make sure resources
419  * are released as early as possible.
420  *
421  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
422  * open after the stream is closed. See the documentation for the individual
423  * stream for details.
424  *
425  * On failure the first error that happened will be reported, but the close
426  * operation will finish as much as possible. A stream that failed to
427  * close will still return %G_IO_ERROR_CLOSED all operations. Still, it
428  * is important to check and report the error to the user.
429  *
430  * If @cancellable is not NULL, then the operation can be cancelled by
431  * triggering the cancellable object from another thread. If the operation
432  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
433  * Cancelling a close will still leave the stream closed, but some streams
434  * can use a faster close that doesn't block to e.g. check errors. 
435  *
436  * Return value: %TRUE on success, %FALSE on failure
437  **/
438 gboolean
439 g_input_stream_close (GInputStream  *stream,
440                       GCancellable  *cancellable,
441                       GError       **error)
442 {
443   GInputStreamClass *class;
444   gboolean res;
445
446   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
447
448   class = G_INPUT_STREAM_GET_CLASS (stream);
449
450   if (stream->priv->closed)
451     return TRUE;
452
453   if (stream->priv->pending)
454     {
455       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
456                    _("Stream has outstanding operation"));
457       return FALSE;
458     }
459   
460   res = TRUE;
461
462   stream->priv->pending = TRUE;
463
464   if (cancellable)
465     g_push_current_cancellable (cancellable);
466
467   if (class->close)
468     res = class->close (stream, cancellable, error);
469
470   if (cancellable)
471     g_pop_current_cancellable (cancellable);
472   
473   stream->priv->closed = TRUE;
474   
475   stream->priv->pending = FALSE;
476
477   return res;
478 }
479
480 static void
481 async_ready_callback_wrapper (GObject *source_object,
482                               GAsyncResult *res,
483                               gpointer      user_data)
484 {
485   GInputStream *stream = G_INPUT_STREAM (source_object);
486
487   stream->priv->pending = FALSE;
488   if (stream->priv->outstanding_callback)
489     (*stream->priv->outstanding_callback) (source_object, res, user_data);
490   g_object_unref (stream);
491 }
492
493 static void
494 async_ready_close_callback_wrapper (GObject *source_object,
495                                     GAsyncResult *res,
496                                     gpointer      user_data)
497 {
498   GInputStream *stream = G_INPUT_STREAM (source_object);
499
500   stream->priv->pending = FALSE;
501   stream->priv->closed = TRUE;
502   if (stream->priv->outstanding_callback)
503     (*stream->priv->outstanding_callback) (source_object, res, user_data);
504   g_object_unref (stream);
505 }
506
507 /**
508  * g_input_stream_read_async:
509  * @stream: A #GInputStream.
510  * @buffer: a buffer to read data into (which should be at least count bytes long).
511  * @count: the number of bytes that will be read from the stream
512  * @io_priority: the io priority of the request. the io priority of the request
513  * @cancellable: optional #GCancellable object, %NULL to ignore.
514  * @callback: callback to call when the request is satisfied
515  * @user_data: the data to pass to callback function
516  *
517  * Request an asynchronous read of @count bytes from the stream into the buffer
518  * starting at @buffer. When the operation is finished @callback will be called,
519  * giving the results.
520  *
521  * During an async request no other sync and async calls are allowed, and will
522  * result in %G_IO_ERROR_PENDING errors. 
523  *
524  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
525  *
526  * On success, the number of bytes read into the buffer will be passed to the
527  * callback. It is not an error if this is not the same as the requested size, as it
528  * can happen e.g. near the end of a file, but generally we try to read
529  * as many bytes as requested. Zero is returned on end of file
530  * (or if @count is zero),  but never otherwise.
531  *
532  * Any outstanding i/o request with higher priority (lower numerical value) will
533  * be executed before an outstanding request with lower priority. Default
534  * priority is %G_PRIORITY_DEFAULT.
535  *
536  * The asyncronous methods have a default fallback that uses threads to implement
537  * asynchronicity, so they are optional for inheriting classes. However, if you
538  * override one you must override all.
539  **/
540 void
541 g_input_stream_read_async (GInputStream        *stream,
542                            void                *buffer,
543                            gsize                count,
544                            int                  io_priority,
545                            GCancellable        *cancellable,
546                            GAsyncReadyCallback  callback,
547                            gpointer             user_data)
548 {
549   GInputStreamClass *class;
550   GSimpleAsyncResult *simple;
551
552   g_return_if_fail (G_IS_INPUT_STREAM (stream));
553   g_return_if_fail (buffer != NULL);
554
555   if (count == 0)
556     {
557       simple = g_simple_async_result_new (G_OBJECT (stream),
558                                           callback,
559                                           user_data,
560                                           g_input_stream_read_async);
561       g_simple_async_result_complete_in_idle (simple);
562       g_object_unref (simple);
563       return;
564     }
565   
566   if (((gssize) count) < 0)
567     {
568       g_simple_async_report_error_in_idle (G_OBJECT (stream),
569                                            callback,
570                                            user_data,
571                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
572                                            _("Too large count value passed to g_input_stream_read_async"));
573       return;
574     }
575
576   if (stream->priv->closed)
577     {
578       g_simple_async_report_error_in_idle (G_OBJECT (stream),
579                                            callback,
580                                            user_data,
581                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
582                                            _("Stream is already closed"));
583       return;
584     }
585   
586   if (stream->priv->pending)
587     {
588       g_simple_async_report_error_in_idle (G_OBJECT (stream),
589                                            callback,
590                                            user_data,
591                                            G_IO_ERROR, G_IO_ERROR_PENDING,
592                                            _("Stream has outstanding operation"));
593       return;
594     }
595
596   class = G_INPUT_STREAM_GET_CLASS (stream);
597   
598   stream->priv->pending = TRUE;
599   stream->priv->outstanding_callback = callback;
600   g_object_ref (stream);
601   class->read_async (stream, buffer, count, io_priority, cancellable,
602                      async_ready_callback_wrapper, user_data);
603 }
604
605 /**
606  * g_input_stream_read_finish:
607  * @stream:
608  * @result:
609  * @error: a #GError location to store the error occuring, or %NULL to 
610  * ignore.
611  * Returns: 
612  **/
613 gssize
614 g_input_stream_read_finish (GInputStream              *stream,
615                             GAsyncResult              *result,
616                             GError                   **error)
617 {
618   GSimpleAsyncResult *simple;
619   GInputStreamClass *class;
620   
621   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
622   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
623
624   if (G_IS_SIMPLE_ASYNC_RESULT (result))
625     {
626       simple = G_SIMPLE_ASYNC_RESULT (result);
627       if (g_simple_async_result_propagate_error (simple, error))
628         return -1;
629
630       /* Special case read of 0 bytes */
631       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_read_async)
632         return 0;
633     }
634
635   class = G_INPUT_STREAM_GET_CLASS (stream);
636   return class->read_finish (stream, result, error);
637 }
638
639 /**
640  * g_input_stream_skip_async:
641  * @stream: A #GInputStream.
642  * @count: the number of bytes that will be skipped from the stream
643  * @io_priority: the io priority of the request. the io priority of the request
644  * @cancellable: optional #GCancellable object, %NULL to ignore. 
645  * @callback: callback to call when the request is satisfied
646  * @user_data: the data to pass to callback function
647  *
648  * Request an asynchronous skip of @count bytes from the stream into the buffer
649  * starting at @buffer. When the operation is finished @callback will be called,
650  * giving the results.
651  *
652  * During an async request no other sync and async calls are allowed, and will
653  * result in %G_IO_ERROR_PENDING errors. 
654  *
655  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
656  *
657  * On success, the number of bytes skipped will be passed to the
658  * callback. It is not an error if this is not the same as the requested size, as it
659  * can happen e.g. near the end of a file, but generally we try to skip
660  * as many bytes as requested. Zero is returned on end of file
661  * (or if @count is zero), but never otherwise.
662  *
663  * Any outstanding i/o request with higher priority (lower numerical value) will
664  * be executed before an outstanding request with lower priority. Default
665  * priority is %G_PRIORITY_DEFAULT.
666  *
667  * The asyncronous methods have a default fallback that uses threads to implement
668  * asynchronicity, so they are optional for inheriting classes. However, if you
669  * override one you must override all.
670  **/
671 void
672 g_input_stream_skip_async (GInputStream        *stream,
673                            gsize                count,
674                            int                  io_priority,
675                            GCancellable        *cancellable,
676                            GAsyncReadyCallback  callback,
677                            gpointer             user_data)
678 {
679   GInputStreamClass *class;
680   GSimpleAsyncResult *simple;
681
682   g_return_if_fail (G_IS_INPUT_STREAM (stream));
683
684   if (count == 0)
685     {
686       simple = g_simple_async_result_new (G_OBJECT (stream),
687                                           callback,
688                                           user_data,
689                                           g_input_stream_skip_async);
690
691       g_simple_async_result_complete_in_idle (simple);
692       g_object_unref (simple);
693       return;
694     }
695   
696   if (((gssize) count) < 0)
697     {
698       g_simple_async_report_error_in_idle (G_OBJECT (stream),
699                                            callback,
700                                            user_data,
701                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
702                                            _("Too large count value passed to g_input_stream_skip_async"));
703       return;
704     }
705
706   if (stream->priv->closed)
707     {
708       g_simple_async_report_error_in_idle (G_OBJECT (stream),
709                                            callback,
710                                            user_data,
711                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
712                                            _("Stream is already closed"));
713       return;
714     }
715   
716   if (stream->priv->pending)
717     {
718       g_simple_async_report_error_in_idle (G_OBJECT (stream),
719                                            callback,
720                                            user_data,
721                                            G_IO_ERROR, G_IO_ERROR_PENDING,
722                                            _("Stream has outstanding operation"));
723       return;
724     }
725
726   class = G_INPUT_STREAM_GET_CLASS (stream);
727   stream->priv->pending = TRUE;
728   stream->priv->outstanding_callback = callback;
729   g_object_ref (stream);
730   class->skip_async (stream, count, io_priority, cancellable,
731                      async_ready_callback_wrapper, user_data);
732 }
733
734 /**
735  * g_input_stream_skip_finish:
736  * @stream:
737  * @result:
738  * @error: a #GError location to store the error occuring, or %NULL to 
739  * ignore.
740  * Returns: 
741  **/
742 gssize
743 g_input_stream_skip_finish (GInputStream              *stream,
744                             GAsyncResult              *result,
745                             GError                   **error)
746 {
747   GSimpleAsyncResult *simple;
748   GInputStreamClass *class;
749
750   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
751   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
752
753   if (G_IS_SIMPLE_ASYNC_RESULT (result))
754     {
755       simple = G_SIMPLE_ASYNC_RESULT (result);
756       if (g_simple_async_result_propagate_error (simple, error))
757         return -1;
758
759       /* Special case skip of 0 bytes */
760       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_skip_async)
761         return 0;
762     }
763
764   class = G_INPUT_STREAM_GET_CLASS (stream);
765   return class->skip_finish (stream, result, error);
766 }
767
768 /**
769  * g_input_stream_close_async:
770  * @stream: A #GInputStream.
771  * @io_priority: the io priority of the request. the io priority of the request
772  * @cancellable: optional cancellable object
773  * @callback: callback to call when the request is satisfied
774  * @user_data: the data to pass to callback function
775  *
776  * Requests an asynchronous closes of the stream, releasing resources related to it.
777  * When the operation is finished @callback will be called, giving the results.
778  *
779  * For behaviour details see g_input_stream_close().
780  *
781  * The asyncronous methods have a default fallback that uses threads to implement
782  * asynchronicity, so they are optional for inheriting classes. However, if you
783  * override one you must override all.
784  **/
785 void
786 g_input_stream_close_async (GInputStream       *stream,
787                             int                 io_priority,
788                             GCancellable       *cancellable,
789                             GAsyncReadyCallback callback,
790                             gpointer            user_data)
791 {
792   GInputStreamClass *class;
793   GSimpleAsyncResult *simple;
794
795   g_return_if_fail (G_IS_INPUT_STREAM (stream));
796
797   if (stream->priv->closed)
798     {
799       simple = g_simple_async_result_new (G_OBJECT (stream),
800                                           callback,
801                                           user_data,
802                                           g_input_stream_close_async);
803
804       g_simple_async_result_complete_in_idle (simple);
805       g_object_unref (simple);
806       return;
807     }
808
809   if (stream->priv->pending)
810     {
811       g_simple_async_report_error_in_idle (G_OBJECT (stream),
812                                            callback,
813                                            user_data,
814                                            G_IO_ERROR, G_IO_ERROR_PENDING,
815                                            _("Stream has outstanding operation"));
816       return;
817     }
818   
819   class = G_INPUT_STREAM_GET_CLASS (stream);
820   stream->priv->pending = TRUE;
821   stream->priv->outstanding_callback = callback;
822   g_object_ref (stream);
823   class->close_async (stream, io_priority, cancellable,
824                       async_ready_close_callback_wrapper, user_data);
825 }
826
827 /**
828  * g_input_stream_close_finish:
829  * @stream:
830  * @result:
831  * @error: a #GError location to store the error occuring, or %NULL to 
832  * ignore.
833  * Returns: 
834  **/
835 gboolean
836 g_input_stream_close_finish (GInputStream              *stream,
837                              GAsyncResult              *result,
838                              GError                   **error)
839 {
840   GSimpleAsyncResult *simple;
841   GInputStreamClass *class;
842
843   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
844   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
845
846   if (G_IS_SIMPLE_ASYNC_RESULT (result))
847     {
848       simple = G_SIMPLE_ASYNC_RESULT (result);
849       if (g_simple_async_result_propagate_error (simple, error))
850         return FALSE;
851
852       /* Special case already closed */
853       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_close_async)
854         return TRUE;
855     }
856
857   class = G_INPUT_STREAM_GET_CLASS (stream);
858   return class->close_finish (stream, result, error);
859 }
860
861 /**
862  * g_input_stream_is_closed:
863  * @stream: input stream.
864  * 
865  * Returns: %TRUE if the stream is closed.
866  **/
867 gboolean
868 g_input_stream_is_closed (GInputStream *stream)
869 {
870   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
871   
872   return stream->priv->closed;
873 }
874  
875 /**
876  * g_input_stream_has_pending:
877  * @stream: input stream.
878  * 
879  * Returns: %TRUE if @stream has pending actions.
880  **/  
881 gboolean
882 g_input_stream_has_pending (GInputStream *stream)
883 {
884   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
885   
886   return stream->priv->pending;
887 }
888
889 /**
890  * g_input_stream_set_pending:
891  * @stream: input stream
892  * @pending: boolean.
893  * 
894  * Sets @stream has actions pending.
895  **/
896 void
897 g_input_stream_set_pending (GInputStream              *stream,
898                             gboolean                   pending)
899 {
900   g_return_if_fail (G_IS_INPUT_STREAM (stream));
901   
902   stream->priv->pending = pending;
903 }
904
905 /********************************************
906  *   Default implementation of async ops    *
907  ********************************************/
908
909 typedef struct {
910   void              *buffer;
911   gsize              count_requested;
912   gssize             count_read;
913 } ReadData;
914
915 static void
916 read_async_thread (GSimpleAsyncResult *res,
917                    GObject *object,
918                    GCancellable *cancellable)
919 {
920   ReadData *op;
921   GInputStreamClass *class;
922   GError *error = NULL;
923  
924   op = g_simple_async_result_get_op_res_gpointer (res);
925
926   class = G_INPUT_STREAM_GET_CLASS (object);
927
928   op->count_read = class->read (G_INPUT_STREAM (object),
929                                 op->buffer, op->count_requested,
930                                 cancellable, &error);
931   if (op->count_read == -1)
932     {
933       g_simple_async_result_set_from_error (res, error);
934       g_error_free (error);
935     }
936 }
937
938 static void
939 g_input_stream_real_read_async (GInputStream *stream,
940                                 void *buffer,
941                                 gsize count,
942                                 int io_priority,
943                                 GCancellable *cancellable,
944                                 GAsyncReadyCallback callback,
945                                 gpointer user_data)
946 {
947   GSimpleAsyncResult *res;
948   ReadData *op;
949   
950   op = g_new (ReadData, 1);
951   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_input_stream_real_read_async);
952   g_simple_async_result_set_op_res_gpointer (res, op, g_free);
953   op->buffer = buffer;
954   op->count_requested = count;
955   
956   g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
957   g_object_unref (res);
958 }
959
960 static gssize
961 g_input_stream_real_read_finish (GInputStream              *stream,
962                                  GAsyncResult              *result,
963                                  GError                   **error)
964 {
965   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
966   ReadData *op;
967
968   g_assert (g_simple_async_result_get_source_tag (simple) == 
969             g_input_stream_real_read_async);
970
971   op = g_simple_async_result_get_op_res_gpointer (simple);
972
973   return op->count_read;
974 }
975
976 typedef struct {
977   gsize count_requested;
978   gssize count_skipped;
979 } SkipData;
980
981
982 static void
983 skip_async_thread (GSimpleAsyncResult *res,
984                    GObject *object,
985                    GCancellable *cancellable)
986 {
987   SkipData *op;
988   GInputStreamClass *class;
989   GError *error = NULL;
990   
991   class = G_INPUT_STREAM_GET_CLASS (object);
992   op = g_simple_async_result_get_op_res_gpointer (res);
993   op->count_skipped = class->skip (G_INPUT_STREAM (object),
994                                    op->count_requested,
995                                    cancellable, &error);
996   if (op->count_skipped == -1)
997     {
998       g_simple_async_result_set_from_error (res, error);
999       g_error_free (error);
1000     }
1001 }
1002
1003 typedef struct {
1004   char buffer[8192];
1005   gsize count;
1006   gsize count_skipped;
1007   int io_prio;
1008   GCancellable *cancellable;
1009   gpointer user_data;
1010   GAsyncReadyCallback callback;
1011 } SkipFallbackAsyncData;
1012
1013 static void
1014 skip_callback_wrapper (GObject *source_object,
1015                        GAsyncResult *res,
1016                        gpointer user_data)
1017 {
1018   GInputStreamClass *class;
1019   SkipFallbackAsyncData *data = user_data;
1020   SkipData *op;
1021   GSimpleAsyncResult *simple;
1022   GError *error = NULL;
1023   gssize ret;
1024
1025   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1026
1027   if (ret > 0)
1028     {
1029       data->count -= ret;
1030       data->count_skipped += ret;
1031
1032       if (data->count > 0)
1033         {
1034           class = G_INPUT_STREAM_GET_CLASS (source_object);
1035           class->read_async (G_INPUT_STREAM (source_object), data->buffer, MIN (8192, data->count), data->io_prio, data->cancellable,
1036                              skip_callback_wrapper, data);
1037           return;
1038         }
1039     }
1040
1041   op = g_new0 (SkipData, 1);
1042   op->count_skipped = data->count_skipped;
1043   simple = g_simple_async_result_new (source_object,
1044                                       data->callback, data->user_data,
1045                                       g_input_stream_real_skip_async);
1046
1047   g_simple_async_result_set_op_res_gpointer (simple, op, g_free);
1048
1049   if (ret == -1)
1050     {
1051       if (data->count_skipped && 
1052           error->domain == G_IO_ERROR &&
1053           error->code == G_IO_ERROR_CANCELLED)
1054         { /* No error, return partial read */ }
1055       else
1056         g_simple_async_result_set_from_error (simple, error);
1057       g_error_free (error);
1058     }
1059
1060   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1061   g_simple_async_result_complete (simple);
1062   g_object_unref (simple);
1063   
1064   g_free (data);
1065  }
1066
1067 static void
1068 g_input_stream_real_skip_async (GInputStream        *stream,
1069                                 gsize                count,
1070                                 int                  io_priority,
1071                                 GCancellable        *cancellable,
1072                                 GAsyncReadyCallback  callback,
1073                                 gpointer             user_data)
1074 {
1075   GInputStreamClass *class;
1076   SkipData *op;
1077   SkipFallbackAsyncData *data;
1078   GSimpleAsyncResult *res;
1079
1080   class = G_INPUT_STREAM_GET_CLASS (stream);
1081
1082   if (class->read_async == g_input_stream_real_read_async)
1083     {
1084       /* Read is thread-using async fallback.
1085        * Make skip use threads too, so that we can use a possible sync skip
1086        * implementation. */
1087       op = g_new0 (SkipData, 1);
1088       
1089       res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
1090                                        g_input_stream_real_skip_async);
1091
1092       g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1093
1094       op->count_requested = count;
1095
1096       g_simple_async_result_run_in_thread (res, skip_async_thread, io_priority, cancellable);
1097       g_object_unref (res);
1098     }
1099   else
1100     {
1101       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1102       
1103       /* There is a custom async read function, lets use that. */
1104       data = g_new (SkipFallbackAsyncData, 1);
1105       data->count = count;
1106       data->count_skipped = 0;
1107       data->io_prio = io_priority;
1108       data->cancellable = cancellable;
1109       data->callback = callback;
1110       data->user_data = user_data;
1111       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1112                          skip_callback_wrapper, data);
1113     }
1114
1115 }
1116
1117 static gssize
1118 g_input_stream_real_skip_finish (GInputStream              *stream,
1119                                  GAsyncResult              *result,
1120                                  GError                   **error)
1121 {
1122   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1123   SkipData *op;
1124
1125   g_assert (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_skip_async);
1126   op = g_simple_async_result_get_op_res_gpointer (simple);
1127   return op->count_skipped;
1128 }
1129
1130 static void
1131 close_async_thread (GSimpleAsyncResult *res,
1132                     GObject *object,
1133                     GCancellable *cancellable)
1134 {
1135   GInputStreamClass *class;
1136   GError *error = NULL;
1137   gboolean result;
1138
1139   /* Auto handling of cancelation disabled, and ignore
1140      cancellation, since we want to close things anyway, although
1141      possibly in a quick-n-dirty way. At least we never want to leak
1142      open handles */
1143   
1144   class = G_INPUT_STREAM_GET_CLASS (object);
1145   result = class->close (G_INPUT_STREAM (object), cancellable, &error);
1146   if (!result)
1147     {
1148       g_simple_async_result_set_from_error (res, error);
1149       g_error_free (error);
1150     }
1151 }
1152
1153 static void
1154 g_input_stream_real_close_async (GInputStream        *stream,
1155                                  int                  io_priority,
1156                                  GCancellable        *cancellable,
1157                                  GAsyncReadyCallback  callback,
1158                                  gpointer             user_data)
1159 {
1160   GSimpleAsyncResult *res;
1161   
1162   res = g_simple_async_result_new (G_OBJECT (stream),
1163                                    callback,
1164                                    user_data,
1165                                    g_input_stream_real_close_async);
1166
1167   g_simple_async_result_set_handle_cancellation (res, FALSE);
1168   
1169   g_simple_async_result_run_in_thread (res,
1170                                        close_async_thread,
1171                                        io_priority,
1172                                        cancellable);
1173   g_object_unref (res);
1174 }
1175
1176 static gboolean
1177 g_input_stream_real_close_finish (GInputStream              *stream,
1178                                   GAsyncResult              *result,
1179                                   GError                   **error)
1180 {
1181   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1182   g_assert (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_close_async);
1183   return TRUE;
1184 }