g_push/pop_current_cancellable -> g_cancellable_push/pop_current
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include <config.h>
24 #include <glib.h>
25 #include "glibintl.h"
26
27 #include "ginputstream.h"
28 #include "gseekable.h"
29 #include "gsimpleasyncresult.h"
30
31 #include "gioalias.h"
32
33 /**
34  * SECTION:ginputstream
35  * @short_description: Base class for implementing streaming input
36  *
37  * 
38  * 
39  **/
40
41 G_DEFINE_TYPE (GInputStream, g_input_stream, G_TYPE_OBJECT);
42
43 struct _GInputStreamPrivate {
44   guint closed : 1;
45   guint pending : 1;
46   GAsyncReadyCallback outstanding_callback;
47 };
48
49 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
50                                                   gsize                 count,
51                                                   GCancellable         *cancellable,
52                                                   GError              **error);
53 static void     g_input_stream_real_read_async   (GInputStream         *stream,
54                                                   void                 *buffer,
55                                                   gsize                 count,
56                                                   int                   io_priority,
57                                                   GCancellable         *cancellable,
58                                                   GAsyncReadyCallback   callback,
59                                                   gpointer              user_data);
60 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
61                                                   GAsyncResult         *result,
62                                                   GError              **error);
63 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
64                                                   gsize                 count,
65                                                   int                   io_priority,
66                                                   GCancellable         *cancellable,
67                                                   GAsyncReadyCallback   callback,
68                                                   gpointer              data);
69 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
70                                                   GAsyncResult         *result,
71                                                   GError              **error);
72 static void     g_input_stream_real_close_async  (GInputStream         *stream,
73                                                   int                   io_priority,
74                                                   GCancellable         *cancellable,
75                                                   GAsyncReadyCallback   callback,
76                                                   gpointer              data);
77 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
78                                                   GAsyncResult         *result,
79                                                   GError              **error);
80
81 static void
82 g_input_stream_finalize (GObject *object)
83 {
84   GInputStream *stream;
85
86   stream = G_INPUT_STREAM (object);
87   
88   if (!stream->priv->closed)
89     g_input_stream_close (stream, NULL, NULL);
90
91   if (G_OBJECT_CLASS (g_input_stream_parent_class)->finalize)
92     (*G_OBJECT_CLASS (g_input_stream_parent_class)->finalize) (object);
93 }
94
95 static void
96 g_input_stream_dispose (GObject *object)
97 {
98   GInputStream *stream;
99
100   stream = G_INPUT_STREAM (object);
101   
102   if (!stream->priv->closed)
103     g_input_stream_close (stream, NULL, NULL);
104   
105   if (G_OBJECT_CLASS (g_input_stream_parent_class)->dispose)
106     (*G_OBJECT_CLASS (g_input_stream_parent_class)->dispose) (object);
107 }
108
109
110 static void
111 g_input_stream_class_init (GInputStreamClass *klass)
112 {
113   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
114   
115   g_type_class_add_private (klass, sizeof (GInputStreamPrivate));
116   
117   gobject_class->finalize = g_input_stream_finalize;
118   gobject_class->dispose = g_input_stream_dispose;
119   
120   klass->skip = g_input_stream_real_skip;
121   klass->read_async = g_input_stream_real_read_async;
122   klass->read_finish = g_input_stream_real_read_finish;
123   klass->skip_async = g_input_stream_real_skip_async;
124   klass->skip_finish = g_input_stream_real_skip_finish;
125   klass->close_async = g_input_stream_real_close_async;
126   klass->close_finish = g_input_stream_real_close_finish;
127 }
128
129 static void
130 g_input_stream_init (GInputStream *stream)
131 {
132   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
133                                               G_TYPE_INPUT_STREAM,
134                                               GInputStreamPrivate);
135 }
136
137 /**
138  * g_input_stream_read:
139  * @stream: a #GInputStream.
140  * @buffer: a buffer to read data into (which should be at least count bytes long).
141  * @count: the number of bytes that will be read from the stream
142  * @cancellable: optional #GCancellable object, %NULL to ignore.
143  * @error: location to store the error occuring, or %NULL to ignore
144  *
145  * Tries to read @count bytes from the stream into the buffer starting at
146  * @buffer. Will block during this read.
147  * 
148  * If count is zero returns zero and does nothing. A value of @count
149  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
150  *
151  * On success, the number of bytes read into the buffer is returned.
152  * It is not an error if this is not the same as the requested size, as it
153  * can happen e.g. near the end of a file. Zero is returned on end of file
154  * (or if @count is zero),  but never otherwise.
155  *
156  * If @cancellable is not NULL, then the operation can be cancelled by
157  * triggering the cancellable object from another thread. If the operation
158  * was cancelled, the error G_IO_ERROR_CANCELLED will be returned. If an
159  * operation was partially finished when the operation was cancelled the
160  * partial result will be returned, without an error.
161  *
162  * On error -1 is returned and @error is set accordingly.
163  * 
164  * Return value: Number of bytes read, or -1 on error
165  **/
166 gssize
167 g_input_stream_read  (GInputStream  *stream,
168                       void          *buffer,
169                       gsize          count,
170                       GCancellable  *cancellable,
171                       GError       **error)
172 {
173   GInputStreamClass *class;
174   gssize res;
175
176   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
177   g_return_val_if_fail (buffer != NULL, 0);
178
179   if (count == 0)
180     return 0;
181   
182   if (((gssize) count) < 0)
183     {
184       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
185                    _("Too large count value passed to g_input_stream_read"));
186       return -1;
187     }
188
189   class = G_INPUT_STREAM_GET_CLASS (stream);
190
191   if (class->read_fn == NULL) 
192     {
193       g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
194                    _("Input stream doesn't implement read"));
195       return -1;
196     }
197
198   if (!g_input_stream_set_pending (stream, error))
199     return -1;
200
201   if (cancellable)
202     g_cancellable_push_current (cancellable);
203   
204   res = class->read_fn (stream, buffer, count, cancellable, error);
205
206   if (cancellable)
207     g_cancellable_pop_current (cancellable);
208   
209   g_input_stream_clear_pending (stream);
210
211   return res;
212 }
213
214 /**
215  * g_input_stream_read_all:
216  * @stream: a #GInputStream.
217  * @buffer: a buffer to read data into (which should be at least count bytes long).
218  * @count: the number of bytes that will be read from the stream
219  * @bytes_read: location to store the number of bytes that was read from the stream
220  * @cancellable: optional #GCancellable object, %NULL to ignore.
221  * @error: location to store the error occuring, or %NULL to ignore
222  *
223  * Tries to read @count bytes from the stream into the buffer starting at
224  * @buffer. Will block during this read.
225  *
226  * This function is similar to g_input_stream_read(), except it tries to
227  * read as many bytes as requested, only stopping on an error or end of stream.
228  *
229  * On a successful read of @count bytes, or if we reached the end of the
230  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
231  * read into @buffer.
232  * 
233  * If there is an error during the operation %FALSE is returned and @error
234  * is set to indicate the error status, @bytes_read is updated to contain
235  * the number of bytes read into @buffer before the error occured.
236  *
237  * Return value: %TRUE on success, %FALSE if there was an error
238  **/
239 gboolean
240 g_input_stream_read_all (GInputStream  *stream,
241                          void          *buffer,
242                          gsize          count,
243                          gsize         *bytes_read,
244                          GCancellable  *cancellable,
245                          GError       **error)
246 {
247   gsize _bytes_read;
248   gssize res;
249
250   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
251   g_return_val_if_fail (buffer != NULL, FALSE);
252
253   _bytes_read = 0;
254   while (_bytes_read < count)
255     {
256       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
257                                  cancellable, error);
258       if (res == -1)
259         {
260           if (bytes_read)
261             *bytes_read = _bytes_read;
262           return FALSE;
263         }
264       
265       if (res == 0)
266         break;
267
268       _bytes_read += res;
269     }
270
271   if (bytes_read)
272     *bytes_read = _bytes_read;
273   return TRUE;
274 }
275
276 /**
277  * g_input_stream_skip:
278  * @stream: a #GInputStream.
279  * @count: the number of bytes that will be skipped from the stream
280  * @cancellable: optional #GCancellable object, %NULL to ignore. 
281  * @error: location to store the error occuring, or %NULL to ignore
282  *
283  * Tries to skip @count bytes from the stream. Will block during the operation.
284  *
285  * This is identical to g_input_stream_read(), from a behaviour standpoint,
286  * but the bytes that are skipped are not returned to the user. Some
287  * streams have an implementation that is more efficient than reading the data.
288  *
289  * This function is optional for inherited classes, as the default implementation
290  * emulates it using read.
291  *
292  * If @cancellable is not %NULL, then the operation can be cancelled by
293  * triggering the cancellable object from another thread. If the operation
294  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
295  * operation was partially finished when the operation was cancelled the
296  * partial result will be returned, without an error.
297  *
298  * Return value: Number of bytes skipped, or -1 on error
299  **/
300 gssize
301 g_input_stream_skip (GInputStream  *stream,
302                      gsize          count,
303                      GCancellable  *cancellable,
304                      GError       **error)
305 {
306   GInputStreamClass *class;
307   gssize res;
308
309   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
310
311   if (count == 0)
312     return 0;
313
314   if (((gssize) count) < 0)
315     {
316       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
317                    _("Too large count value passed to g_input_stream_skip"));
318       return -1;
319     }
320   
321   class = G_INPUT_STREAM_GET_CLASS (stream);
322
323   if (!g_input_stream_set_pending (stream, error))
324     return -1;
325
326   if (cancellable)
327     g_cancellable_push_current (cancellable);
328   
329   res = class->skip (stream, count, cancellable, error);
330
331   if (cancellable)
332     g_cancellable_pop_current (cancellable);
333   
334   g_input_stream_clear_pending (stream);
335
336   return res;
337 }
338
339 static gssize
340 g_input_stream_real_skip (GInputStream  *stream,
341                           gsize          count,
342                           GCancellable  *cancellable,
343                           GError       **error)
344 {
345   GInputStreamClass *class;
346   gssize ret, read_bytes;
347   char buffer[8192];
348   GError *my_error;
349
350   class = G_INPUT_STREAM_GET_CLASS (stream);
351
352   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
353     {
354       if (g_seekable_seek (G_SEEKABLE (stream),
355                            count,
356                            G_SEEK_CUR,
357                            cancellable,
358                            NULL))
359         return count;
360     }
361
362   /* If not seekable, or seek failed, fall back to reading data: */
363
364   class = G_INPUT_STREAM_GET_CLASS (stream);
365   
366   read_bytes = 0;
367   while (1)
368     {
369       my_error = NULL;
370
371       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
372                          cancellable, &my_error);
373       if (ret == -1)
374         {
375           if (read_bytes > 0 &&
376               my_error->domain == G_IO_ERROR &&
377               my_error->code == G_IO_ERROR_CANCELLED)
378             {
379               g_error_free (my_error);
380               return read_bytes;
381             }
382           
383           g_propagate_error (error, my_error);
384           return -1;
385         }
386
387       count -= ret;
388       read_bytes += ret;
389       
390       if (ret == 0 || count == 0)
391         return read_bytes;
392     }
393 }
394
395 /**
396  * g_input_stream_close:
397  * @stream: A #GInputStream.
398  * @cancellable: optional #GCancellable object, %NULL to ignore.
399  * @error: location to store the error occuring, or %NULL to ignore
400  *
401  * Closes the stream, releasing resources related to it.
402  *
403  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
404  * Closing a stream multiple times will not return an error.
405  *
406  * Streams will be automatically closed when the last reference
407  * is dropped, but you might want to call make sure resources
408  * are released as early as possible.
409  *
410  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
411  * open after the stream is closed. See the documentation for the individual
412  * stream for details.
413  *
414  * On failure the first error that happened will be reported, but the close
415  * operation will finish as much as possible. A stream that failed to
416  * close will still return %G_IO_ERROR_CLOSED all operations. Still, it
417  * is important to check and report the error to the user.
418  *
419  * If @cancellable is not NULL, then the operation can be cancelled by
420  * triggering the cancellable object from another thread. If the operation
421  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
422  * Cancelling a close will still leave the stream closed, but some streams
423  * can use a faster close that doesn't block to e.g. check errors. 
424  *
425  * Return value: %TRUE on success, %FALSE on failure
426  **/
427 gboolean
428 g_input_stream_close (GInputStream  *stream,
429                       GCancellable  *cancellable,
430                       GError       **error)
431 {
432   GInputStreamClass *class;
433   gboolean res;
434
435   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
436
437   class = G_INPUT_STREAM_GET_CLASS (stream);
438
439   if (stream->priv->closed)
440     return TRUE;
441
442   res = TRUE;
443
444   if (!g_input_stream_set_pending (stream, error))
445     return FALSE;
446
447   if (cancellable)
448     g_cancellable_push_current (cancellable);
449
450   if (class->close_fn)
451     res = class->close_fn (stream, cancellable, error);
452
453   if (cancellable)
454     g_cancellable_pop_current (cancellable);
455
456   g_input_stream_clear_pending (stream);
457   
458   stream->priv->closed = TRUE;
459   
460   return res;
461 }
462
463 static void
464 async_ready_callback_wrapper (GObject      *source_object,
465                               GAsyncResult *res,
466                               gpointer      user_data)
467 {
468   GInputStream *stream = G_INPUT_STREAM (source_object);
469
470   g_input_stream_clear_pending (stream);
471   if (stream->priv->outstanding_callback)
472     (*stream->priv->outstanding_callback) (source_object, res, user_data);
473   g_object_unref (stream);
474 }
475
476 static void
477 async_ready_close_callback_wrapper (GObject      *source_object,
478                                     GAsyncResult *res,
479                                     gpointer      user_data)
480 {
481   GInputStream *stream = G_INPUT_STREAM (source_object);
482
483   g_input_stream_clear_pending (stream);
484   stream->priv->closed = TRUE;
485   if (stream->priv->outstanding_callback)
486     (*stream->priv->outstanding_callback) (source_object, res, user_data);
487   g_object_unref (stream);
488 }
489
490 /**
491  * g_input_stream_read_async:
492  * @stream: A #GInputStream.
493  * @buffer: a buffer to read data into (which should be at least count bytes long).
494  * @count: the number of bytes that will be read from the stream
495  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
496  * of the request. 
497  * @cancellable: optional #GCancellable object, %NULL to ignore.
498  * @callback: callback to call when the request is satisfied
499  * @user_data: the data to pass to callback function
500  *
501  * Request an asynchronous read of @count bytes from the stream into the buffer
502  * starting at @buffer. When the operation is finished @callback will be called,
503  * giving the results.
504  *
505  * During an async request no other sync and async calls are allowed, and will
506  * result in %G_IO_ERROR_PENDING errors. 
507  *
508  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
509  *
510  * On success, the number of bytes read into the buffer will be passed to the
511  * callback. It is not an error if this is not the same as the requested size, as it
512  * can happen e.g. near the end of a file, but generally we try to read
513  * as many bytes as requested. Zero is returned on end of file
514  * (or if @count is zero),  but never otherwise.
515  *
516  * Any outstanding i/o request with higher priority (lower numerical value) will
517  * be executed before an outstanding request with lower priority. Default
518  * priority is %G_PRIORITY_DEFAULT.
519  *
520  * The asyncronous methods have a default fallback that uses threads to implement
521  * asynchronicity, so they are optional for inheriting classes. However, if you
522  * override one you must override all.
523  **/
524 void
525 g_input_stream_read_async (GInputStream        *stream,
526                            void                *buffer,
527                            gsize                count,
528                            int                  io_priority,
529                            GCancellable        *cancellable,
530                            GAsyncReadyCallback  callback,
531                            gpointer             user_data)
532 {
533   GInputStreamClass *class;
534   GSimpleAsyncResult *simple;
535   GError *error = NULL;
536
537   g_return_if_fail (G_IS_INPUT_STREAM (stream));
538   g_return_if_fail (buffer != NULL);
539
540   if (count == 0)
541     {
542       simple = g_simple_async_result_new (G_OBJECT (stream),
543                                           callback,
544                                           user_data,
545                                           g_input_stream_read_async);
546       g_simple_async_result_complete_in_idle (simple);
547       g_object_unref (simple);
548       return;
549     }
550   
551   if (((gssize) count) < 0)
552     {
553       g_simple_async_report_error_in_idle (G_OBJECT (stream),
554                                            callback,
555                                            user_data,
556                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
557                                            _("Too large count value passed to g_input_stream_read_async"));
558       return;
559     }
560
561   if (!g_input_stream_set_pending (stream, &error))
562     {
563       g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
564                                             callback,
565                                             user_data,
566                                             error);
567       g_error_free (error);
568       return;
569     }
570
571   class = G_INPUT_STREAM_GET_CLASS (stream);
572   stream->priv->outstanding_callback = callback;
573   g_object_ref (stream);
574   class->read_async (stream, buffer, count, io_priority, cancellable,
575                      async_ready_callback_wrapper, user_data);
576 }
577
578 /**
579  * g_input_stream_read_finish:
580  * @stream: a #GInputStream.
581  * @result: a #GAsyncResult.
582  * @error: a #GError location to store the error occuring, or %NULL to 
583  * ignore.
584  * 
585  * Finishes an asynchronous stream read operation. 
586  * 
587  * Returns: number of bytes read in, or -1 on error.
588  **/
589 gssize
590 g_input_stream_read_finish (GInputStream  *stream,
591                             GAsyncResult  *result,
592                             GError       **error)
593 {
594   GSimpleAsyncResult *simple;
595   GInputStreamClass *class;
596   
597   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
598   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
599
600   if (G_IS_SIMPLE_ASYNC_RESULT (result))
601     {
602       simple = G_SIMPLE_ASYNC_RESULT (result);
603       if (g_simple_async_result_propagate_error (simple, error))
604         return -1;
605
606       /* Special case read of 0 bytes */
607       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_read_async)
608         return 0;
609     }
610
611   class = G_INPUT_STREAM_GET_CLASS (stream);
612   return class->read_finish (stream, result, error);
613 }
614
615 /**
616  * g_input_stream_skip_async:
617  * @stream: A #GInputStream.
618  * @count: the number of bytes that will be skipped from the stream
619  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
620  * of the request. 
621  * @cancellable: optional #GCancellable object, %NULL to ignore. 
622  * @callback: callback to call when the request is satisfied
623  * @user_data: the data to pass to callback function
624  *
625  * Request an asynchronous skip of @count bytes from the stream into the buffer
626  * starting at @buffer. When the operation is finished @callback will be called,
627  * giving the results.
628  *
629  * During an async request no other sync and async calls are allowed, and will
630  * result in %G_IO_ERROR_PENDING errors. 
631  *
632  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
633  *
634  * On success, the number of bytes skipped will be passed to the
635  * callback. It is not an error if this is not the same as the requested size, as it
636  * can happen e.g. near the end of a file, but generally we try to skip
637  * as many bytes as requested. Zero is returned on end of file
638  * (or if @count is zero), but never otherwise.
639  *
640  * Any outstanding i/o request with higher priority (lower numerical value) will
641  * be executed before an outstanding request with lower priority. Default
642  * priority is %G_PRIORITY_DEFAULT.
643  *
644  * The asyncronous methods have a default fallback that uses threads to implement
645  * asynchronicity, so they are optional for inheriting classes. However, if you
646  * override one you must override all.
647  **/
648 void
649 g_input_stream_skip_async (GInputStream        *stream,
650                            gsize                count,
651                            int                  io_priority,
652                            GCancellable        *cancellable,
653                            GAsyncReadyCallback  callback,
654                            gpointer             user_data)
655 {
656   GInputStreamClass *class;
657   GSimpleAsyncResult *simple;
658   GError *error = NULL;
659
660   g_return_if_fail (G_IS_INPUT_STREAM (stream));
661
662   if (count == 0)
663     {
664       simple = g_simple_async_result_new (G_OBJECT (stream),
665                                           callback,
666                                           user_data,
667                                           g_input_stream_skip_async);
668
669       g_simple_async_result_complete_in_idle (simple);
670       g_object_unref (simple);
671       return;
672     }
673   
674   if (((gssize) count) < 0)
675     {
676       g_simple_async_report_error_in_idle (G_OBJECT (stream),
677                                            callback,
678                                            user_data,
679                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
680                                            _("Too large count value passed to g_input_stream_skip_async"));
681       return;
682     }
683
684   if (!g_input_stream_set_pending (stream, &error))
685     {
686       g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
687                                             callback,
688                                             user_data,
689                                             error);
690       g_error_free (error);
691       return;
692     }
693
694   class = G_INPUT_STREAM_GET_CLASS (stream);
695   stream->priv->outstanding_callback = callback;
696   g_object_ref (stream);
697   class->skip_async (stream, count, io_priority, cancellable,
698                      async_ready_callback_wrapper, user_data);
699 }
700
701 /**
702  * g_input_stream_skip_finish:
703  * @stream: a #GInputStream.
704  * @result: a #GAsyncResult.
705  * @error: a #GError location to store the error occuring, or %NULL to 
706  * ignore.
707  * 
708  * Finishes a stream skip operation.
709  * 
710  * Returns: the size of the bytes skipped, or %-1 on error.
711  **/
712 gssize
713 g_input_stream_skip_finish (GInputStream  *stream,
714                             GAsyncResult  *result,
715                             GError       **error)
716 {
717   GSimpleAsyncResult *simple;
718   GInputStreamClass *class;
719
720   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
721   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
722
723   if (G_IS_SIMPLE_ASYNC_RESULT (result))
724     {
725       simple = G_SIMPLE_ASYNC_RESULT (result);
726       if (g_simple_async_result_propagate_error (simple, error))
727         return -1;
728
729       /* Special case skip of 0 bytes */
730       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_skip_async)
731         return 0;
732     }
733
734   class = G_INPUT_STREAM_GET_CLASS (stream);
735   return class->skip_finish (stream, result, error);
736 }
737
738 /**
739  * g_input_stream_close_async:
740  * @stream: A #GInputStream.
741  * @io_priority: the <link linkend="io-priority">I/O priority</link> 
742  * of the request. 
743  * @cancellable: optional cancellable object
744  * @callback: callback to call when the request is satisfied
745  * @user_data: the data to pass to callback function
746  *
747  * Requests an asynchronous closes of the stream, releasing resources related to it.
748  * When the operation is finished @callback will be called, giving the results.
749  *
750  * For behaviour details see g_input_stream_close().
751  *
752  * The asyncronous methods have a default fallback that uses threads to implement
753  * asynchronicity, so they are optional for inheriting classes. However, if you
754  * override one you must override all.
755  **/
756 void
757 g_input_stream_close_async (GInputStream        *stream,
758                             int                  io_priority,
759                             GCancellable        *cancellable,
760                             GAsyncReadyCallback  callback,
761                             gpointer             user_data)
762 {
763   GInputStreamClass *class;
764   GSimpleAsyncResult *simple;
765   GError *error = NULL;
766
767   g_return_if_fail (G_IS_INPUT_STREAM (stream));
768
769   if (stream->priv->closed)
770     {
771       simple = g_simple_async_result_new (G_OBJECT (stream),
772                                           callback,
773                                           user_data,
774                                           g_input_stream_close_async);
775
776       g_simple_async_result_complete_in_idle (simple);
777       g_object_unref (simple);
778       return;
779     }
780
781   if (!g_input_stream_set_pending (stream, &error))
782     {
783       g_simple_async_report_gerror_in_idle (G_OBJECT (stream),
784                                             callback,
785                                             user_data,
786                                             error);
787       g_error_free (error);
788       return;
789     }
790   
791   class = G_INPUT_STREAM_GET_CLASS (stream);
792   stream->priv->outstanding_callback = callback;
793   g_object_ref (stream);
794   class->close_async (stream, io_priority, cancellable,
795                       async_ready_close_callback_wrapper, user_data);
796 }
797
798 /**
799  * g_input_stream_close_finish:
800  * @stream: a #GInputStream.
801  * @result: a #GAsyncResult.
802  * @error: a #GError location to store the error occuring, or %NULL to 
803  * ignore.
804  * 
805  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
806  * 
807  * Returns: %TRUE if the stream was closed successfully.
808  **/
809 gboolean
810 g_input_stream_close_finish (GInputStream  *stream,
811                              GAsyncResult  *result,
812                              GError       **error)
813 {
814   GSimpleAsyncResult *simple;
815   GInputStreamClass *class;
816
817   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
818   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
819
820   if (G_IS_SIMPLE_ASYNC_RESULT (result))
821     {
822       simple = G_SIMPLE_ASYNC_RESULT (result);
823       if (g_simple_async_result_propagate_error (simple, error))
824         return FALSE;
825
826       /* Special case already closed */
827       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_close_async)
828         return TRUE;
829     }
830
831   class = G_INPUT_STREAM_GET_CLASS (stream);
832   return class->close_finish (stream, result, error);
833 }
834
835 /**
836  * g_input_stream_is_closed:
837  * @stream: input stream.
838  * 
839  * Checks if an input stream is closed.
840  * 
841  * Returns: %TRUE if the stream is closed.
842  **/
843 gboolean
844 g_input_stream_is_closed (GInputStream *stream)
845 {
846   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
847   
848   return stream->priv->closed;
849 }
850  
851 /**
852  * g_input_stream_has_pending:
853  * @stream: input stream.
854  * 
855  * Checks if an input stream has pending actions.
856  * 
857  * Returns: %TRUE if @stream has pending actions.
858  **/  
859 gboolean
860 g_input_stream_has_pending (GInputStream *stream)
861 {
862   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
863   
864   return stream->priv->pending;
865 }
866
867 /**
868  * g_input_stream_set_pending:
869  * @stream: input stream
870  * @error: a #GError location to store the error occuring, or %NULL to 
871  * ignore.
872  * 
873  * Sets @stream to have actions pending. If the pending flag is
874  * already set or @stream is closed, it will return %FALSE and set
875  * @error.
876  *
877  * Return value: %TRUE if pending was previously unset and is now set.
878  **/
879 gboolean
880 g_input_stream_set_pending (GInputStream *stream, GError **error)
881 {
882   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
883   
884   if (stream->priv->closed)
885     {
886       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
887                    _("Stream is already closed"));
888       return FALSE;
889     }
890   
891   if (stream->priv->pending)
892     {
893       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
894                    _("Stream has outstanding operation"));
895       return FALSE;
896     }
897   
898   stream->priv->pending = TRUE;
899   return TRUE;
900 }
901
902 /**
903  * g_input_stream_clear_pending:
904  * @stream: input stream
905  * 
906  * Clears the pending flag on @stream.
907  **/
908 void
909 g_input_stream_clear_pending (GInputStream *stream)
910 {
911   g_return_if_fail (G_IS_INPUT_STREAM (stream));
912   
913   stream->priv->pending = FALSE;
914 }
915
916 /********************************************
917  *   Default implementation of async ops    *
918  ********************************************/
919
920 typedef struct {
921   void              *buffer;
922   gsize              count_requested;
923   gssize             count_read;
924 } ReadData;
925
926 static void
927 read_async_thread (GSimpleAsyncResult *res,
928                    GObject            *object,
929                    GCancellable       *cancellable)
930 {
931   ReadData *op;
932   GInputStreamClass *class;
933   GError *error = NULL;
934  
935   op = g_simple_async_result_get_op_res_gpointer (res);
936
937   class = G_INPUT_STREAM_GET_CLASS (object);
938
939   op->count_read = class->read_fn (G_INPUT_STREAM (object),
940                                    op->buffer, op->count_requested,
941                                    cancellable, &error);
942   if (op->count_read == -1)
943     {
944       g_simple_async_result_set_from_error (res, error);
945       g_error_free (error);
946     }
947 }
948
949 static void
950 g_input_stream_real_read_async (GInputStream        *stream,
951                                 void                *buffer,
952                                 gsize                count,
953                                 int                  io_priority,
954                                 GCancellable        *cancellable,
955                                 GAsyncReadyCallback  callback,
956                                 gpointer             user_data)
957 {
958   GSimpleAsyncResult *res;
959   ReadData *op;
960   
961   op = g_new (ReadData, 1);
962   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_input_stream_real_read_async);
963   g_simple_async_result_set_op_res_gpointer (res, op, g_free);
964   op->buffer = buffer;
965   op->count_requested = count;
966   
967   g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
968   g_object_unref (res);
969 }
970
971 static gssize
972 g_input_stream_real_read_finish (GInputStream  *stream,
973                                  GAsyncResult  *result,
974                                  GError       **error)
975 {
976   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
977   ReadData *op;
978
979   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 
980             g_input_stream_real_read_async);
981
982   op = g_simple_async_result_get_op_res_gpointer (simple);
983
984   return op->count_read;
985 }
986
987 typedef struct {
988   gsize count_requested;
989   gssize count_skipped;
990 } SkipData;
991
992
993 static void
994 skip_async_thread (GSimpleAsyncResult *res,
995                    GObject            *object,
996                    GCancellable       *cancellable)
997 {
998   SkipData *op;
999   GInputStreamClass *class;
1000   GError *error = NULL;
1001   
1002   class = G_INPUT_STREAM_GET_CLASS (object);
1003   op = g_simple_async_result_get_op_res_gpointer (res);
1004   op->count_skipped = class->skip (G_INPUT_STREAM (object),
1005                                    op->count_requested,
1006                                    cancellable, &error);
1007   if (op->count_skipped == -1)
1008     {
1009       g_simple_async_result_set_from_error (res, error);
1010       g_error_free (error);
1011     }
1012 }
1013
1014 typedef struct {
1015   char buffer[8192];
1016   gsize count;
1017   gsize count_skipped;
1018   int io_prio;
1019   GCancellable *cancellable;
1020   gpointer user_data;
1021   GAsyncReadyCallback callback;
1022 } SkipFallbackAsyncData;
1023
1024 static void
1025 skip_callback_wrapper (GObject      *source_object,
1026                        GAsyncResult *res,
1027                        gpointer      user_data)
1028 {
1029   GInputStreamClass *class;
1030   SkipFallbackAsyncData *data = user_data;
1031   SkipData *op;
1032   GSimpleAsyncResult *simple;
1033   GError *error = NULL;
1034   gssize ret;
1035
1036   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1037
1038   if (ret > 0)
1039     {
1040       data->count -= ret;
1041       data->count_skipped += ret;
1042
1043       if (data->count > 0)
1044         {
1045           class = G_INPUT_STREAM_GET_CLASS (source_object);
1046           class->read_async (G_INPUT_STREAM (source_object), data->buffer, MIN (8192, data->count), data->io_prio, data->cancellable,
1047                              skip_callback_wrapper, data);
1048           return;
1049         }
1050     }
1051
1052   op = g_new0 (SkipData, 1);
1053   op->count_skipped = data->count_skipped;
1054   simple = g_simple_async_result_new (source_object,
1055                                       data->callback, data->user_data,
1056                                       g_input_stream_real_skip_async);
1057
1058   g_simple_async_result_set_op_res_gpointer (simple, op, g_free);
1059
1060   if (ret == -1)
1061     {
1062       if (data->count_skipped && 
1063           error->domain == G_IO_ERROR &&
1064           error->code == G_IO_ERROR_CANCELLED)
1065         { /* No error, return partial read */ }
1066       else
1067         g_simple_async_result_set_from_error (simple, error);
1068       g_error_free (error);
1069     }
1070
1071   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1072   g_simple_async_result_complete (simple);
1073   g_object_unref (simple);
1074   
1075   g_free (data);
1076  }
1077
1078 static void
1079 g_input_stream_real_skip_async (GInputStream        *stream,
1080                                 gsize                count,
1081                                 int                  io_priority,
1082                                 GCancellable        *cancellable,
1083                                 GAsyncReadyCallback  callback,
1084                                 gpointer             user_data)
1085 {
1086   GInputStreamClass *class;
1087   SkipData *op;
1088   SkipFallbackAsyncData *data;
1089   GSimpleAsyncResult *res;
1090
1091   class = G_INPUT_STREAM_GET_CLASS (stream);
1092
1093   if (class->read_async == g_input_stream_real_read_async)
1094     {
1095       /* Read is thread-using async fallback.
1096        * Make skip use threads too, so that we can use a possible sync skip
1097        * implementation. */
1098       op = g_new0 (SkipData, 1);
1099       
1100       res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
1101                                        g_input_stream_real_skip_async);
1102
1103       g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1104
1105       op->count_requested = count;
1106
1107       g_simple_async_result_run_in_thread (res, skip_async_thread, io_priority, cancellable);
1108       g_object_unref (res);
1109     }
1110   else
1111     {
1112       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1113       
1114       /* There is a custom async read function, lets use that. */
1115       data = g_new (SkipFallbackAsyncData, 1);
1116       data->count = count;
1117       data->count_skipped = 0;
1118       data->io_prio = io_priority;
1119       data->cancellable = cancellable;
1120       data->callback = callback;
1121       data->user_data = user_data;
1122       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1123                          skip_callback_wrapper, data);
1124     }
1125
1126 }
1127
1128 static gssize
1129 g_input_stream_real_skip_finish (GInputStream  *stream,
1130                                  GAsyncResult  *result,
1131                                  GError       **error)
1132 {
1133   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1134   SkipData *op;
1135
1136   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_skip_async);
1137   op = g_simple_async_result_get_op_res_gpointer (simple);
1138   return op->count_skipped;
1139 }
1140
1141 static void
1142 close_async_thread (GSimpleAsyncResult *res,
1143                     GObject            *object,
1144                     GCancellable       *cancellable)
1145 {
1146   GInputStreamClass *class;
1147   GError *error = NULL;
1148   gboolean result;
1149
1150   /* Auto handling of cancelation disabled, and ignore
1151      cancellation, since we want to close things anyway, although
1152      possibly in a quick-n-dirty way. At least we never want to leak
1153      open handles */
1154   
1155   class = G_INPUT_STREAM_GET_CLASS (object);
1156   result = class->close_fn (G_INPUT_STREAM (object), cancellable, &error);
1157   if (!result)
1158     {
1159       g_simple_async_result_set_from_error (res, error);
1160       g_error_free (error);
1161     }
1162 }
1163
1164 static void
1165 g_input_stream_real_close_async (GInputStream        *stream,
1166                                  int                  io_priority,
1167                                  GCancellable        *cancellable,
1168                                  GAsyncReadyCallback  callback,
1169                                  gpointer             user_data)
1170 {
1171   GSimpleAsyncResult *res;
1172   
1173   res = g_simple_async_result_new (G_OBJECT (stream),
1174                                    callback,
1175                                    user_data,
1176                                    g_input_stream_real_close_async);
1177
1178   g_simple_async_result_set_handle_cancellation (res, FALSE);
1179   
1180   g_simple_async_result_run_in_thread (res,
1181                                        close_async_thread,
1182                                        io_priority,
1183                                        cancellable);
1184   g_object_unref (res);
1185 }
1186
1187 static gboolean
1188 g_input_stream_real_close_finish (GInputStream  *stream,
1189                                   GAsyncResult  *result,
1190                                   GError       **error)
1191 {
1192   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1193   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_close_async);
1194   return TRUE;
1195 }
1196
1197 #define __G_INPUT_STREAM_C__
1198 #include "gioaliasdef.c"