Explain I/O priorieties
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include <config.h>
24 #include <glib.h>
25 #include "glibintl.h"
26
27 #include "ginputstream.h"
28 #include "gseekable.h"
29 #include "gsimpleasyncresult.h"
30
31 #include "gioalias.h"
32
33 /**
34  * SECTION:ginputstream
35  * @short_description: base class for implementing streaming input
36  *
37  * 
38  * 
39  **/
40
41 G_DEFINE_TYPE (GInputStream, g_input_stream, G_TYPE_OBJECT);
42
43 struct _GInputStreamPrivate {
44   guint closed : 1;
45   guint pending : 1;
46   GAsyncReadyCallback outstanding_callback;
47 };
48
49 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
50                                                   gsize                 count,
51                                                   GCancellable         *cancellable,
52                                                   GError              **error);
53 static void     g_input_stream_real_read_async   (GInputStream         *stream,
54                                                   void                 *buffer,
55                                                   gsize                 count,
56                                                   int                   io_priority,
57                                                   GCancellable         *cancellable,
58                                                   GAsyncReadyCallback   callback,
59                                                   gpointer              user_data);
60 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
61                                                   GAsyncResult         *result,
62                                                   GError              **error);
63 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
64                                                   gsize                 count,
65                                                   int                   io_priority,
66                                                   GCancellable         *cancellable,
67                                                   GAsyncReadyCallback   callback,
68                                                   gpointer              data);
69 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
70                                                   GAsyncResult         *result,
71                                                   GError              **error);
72 static void     g_input_stream_real_close_async  (GInputStream         *stream,
73                                                   int                   io_priority,
74                                                   GCancellable         *cancellable,
75                                                   GAsyncReadyCallback   callback,
76                                                   gpointer              data);
77 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
78                                                   GAsyncResult         *result,
79                                                   GError              **error);
80
81 static void
82 g_input_stream_finalize (GObject *object)
83 {
84   GInputStream *stream;
85
86   stream = G_INPUT_STREAM (object);
87   
88   if (!stream->priv->closed)
89     g_input_stream_close (stream, NULL, NULL);
90
91   if (G_OBJECT_CLASS (g_input_stream_parent_class)->finalize)
92     (*G_OBJECT_CLASS (g_input_stream_parent_class)->finalize) (object);
93 }
94
95 static void
96 g_input_stream_dispose (GObject *object)
97 {
98   GInputStream *stream;
99
100   stream = G_INPUT_STREAM (object);
101   
102   if (!stream->priv->closed)
103     g_input_stream_close (stream, NULL, NULL);
104   
105   if (G_OBJECT_CLASS (g_input_stream_parent_class)->dispose)
106     (*G_OBJECT_CLASS (g_input_stream_parent_class)->dispose) (object);
107 }
108
109
110 static void
111 g_input_stream_class_init (GInputStreamClass *klass)
112 {
113   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
114   
115   g_type_class_add_private (klass, sizeof (GInputStreamPrivate));
116   
117   gobject_class->finalize = g_input_stream_finalize;
118   gobject_class->dispose = g_input_stream_dispose;
119   
120   klass->skip = g_input_stream_real_skip;
121   klass->read_async = g_input_stream_real_read_async;
122   klass->read_finish = g_input_stream_real_read_finish;
123   klass->skip_async = g_input_stream_real_skip_async;
124   klass->skip_finish = g_input_stream_real_skip_finish;
125   klass->close_async = g_input_stream_real_close_async;
126   klass->close_finish = g_input_stream_real_close_finish;
127 }
128
129 static void
130 g_input_stream_init (GInputStream *stream)
131 {
132   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
133                                               G_TYPE_INPUT_STREAM,
134                                               GInputStreamPrivate);
135 }
136
137 /**
138  * g_input_stream_read:
139  * @stream: a #GInputStream.
140  * @buffer: a buffer to read data into (which should be at least count bytes long).
141  * @count: the number of bytes that will be read from the stream
142  * @cancellable: optional #GCancellable object, %NULL to ignore.
143  * @error: location to store the error occuring, or %NULL to ignore
144  *
145  * Tries to read @count bytes from the stream into the buffer starting at
146  * @buffer. Will block during this read.
147  * 
148  * If count is zero returns zero and does nothing. A value of @count
149  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
150  *
151  * On success, the number of bytes read into the buffer is returned.
152  * It is not an error if this is not the same as the requested size, as it
153  * can happen e.g. near the end of a file. Zero is returned on end of file
154  * (or if @count is zero),  but never otherwise.
155  *
156  * If @cancellable is not NULL, then the operation can be cancelled by
157  * triggering the cancellable object from another thread. If the operation
158  * was cancelled, the error G_IO_ERROR_CANCELLED will be returned. If an
159  * operation was partially finished when the operation was cancelled the
160  * partial result will be returned, without an error.
161  *
162  * On error -1 is returned and @error is set accordingly.
163  * 
164  * Return value: Number of bytes read, or -1 on error
165  **/
166 gssize
167 g_input_stream_read  (GInputStream  *stream,
168                       void          *buffer,
169                       gsize          count,
170                       GCancellable  *cancellable,
171                       GError       **error)
172 {
173   GInputStreamClass *class;
174   gssize res;
175
176   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
177   g_return_val_if_fail (buffer != NULL, 0);
178
179   if (count == 0)
180     return 0;
181   
182   if (((gssize) count) < 0)
183     {
184       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
185                    _("Too large count value passed to g_input_stream_read"));
186       return -1;
187     }
188
189   if (stream->priv->closed)
190     {
191       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
192                    _("Stream is already closed"));
193       return -1;
194     }
195   
196   if (stream->priv->pending)
197     {
198       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
199                    _("Stream has outstanding operation"));
200       return -1;
201     }
202   
203   class = G_INPUT_STREAM_GET_CLASS (stream);
204
205   if (class->read == NULL) 
206     {
207       g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
208                    _("Input stream doesn't implement read"));
209       return -1;
210     }
211
212   if (cancellable)
213     g_push_current_cancellable (cancellable);
214   
215   stream->priv->pending = TRUE;
216   res = class->read (stream, buffer, count, cancellable, error);
217   stream->priv->pending = FALSE;
218
219   if (cancellable)
220     g_pop_current_cancellable (cancellable);
221   
222   return res;
223 }
224
225 /**
226  * g_input_stream_read_all:
227  * @stream: a #GInputStream.
228  * @buffer: a buffer to read data into (which should be at least count bytes long).
229  * @count: the number of bytes that will be read from the stream
230  * @bytes_read: location to store the number of bytes that was read from the stream
231  * @cancellable: optional #GCancellable object, %NULL to ignore.
232  * @error: location to store the error occuring, or %NULL to ignore
233  *
234  * Tries to read @count bytes from the stream into the buffer starting at
235  * @buffer. Will block during this read.
236  *
237  * This function is similar to g_input_stream_read(), except it tries to
238  * read as many bytes as requested, only stopping on an error or end of stream.
239  *
240  * On a successful read of @count bytes, or if we reached the end of the
241  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
242  * read into @buffer.
243  * 
244  * If there is an error during the operation %FALSE is returned and @error
245  * is set to indicate the error status, @bytes_read is updated to contain
246  * the number of bytes read into @buffer before the error occured.
247  *
248  * Return value: %TRUE on success, %FALSE if there was an error
249  **/
250 gboolean
251 g_input_stream_read_all (GInputStream  *stream,
252                          void          *buffer,
253                          gsize          count,
254                          gsize         *bytes_read,
255                          GCancellable  *cancellable,
256                          GError       **error)
257 {
258   gsize _bytes_read;
259   gssize res;
260
261   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
262   g_return_val_if_fail (buffer != NULL, FALSE);
263
264   _bytes_read = 0;
265   while (_bytes_read < count)
266     {
267       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
268                                  cancellable, error);
269       if (res == -1)
270         {
271           if (bytes_read)
272             *bytes_read = _bytes_read;
273           return FALSE;
274         }
275       
276       if (res == 0)
277         break;
278
279       _bytes_read += res;
280     }
281
282   if (bytes_read)
283     *bytes_read = _bytes_read;
284   return TRUE;
285 }
286
287 /**
288  * g_input_stream_skip:
289  * @stream: a #GInputStream.
290  * @count: the number of bytes that will be skipped from the stream
291  * @cancellable: optional #GCancellable object, %NULL to ignore. 
292  * @error: location to store the error occuring, or %NULL to ignore
293  *
294  * Tries to skip @count bytes from the stream. Will block during the operation.
295  *
296  * This is identical to g_input_stream_read(), from a behaviour standpoint,
297  * but the bytes that are skipped are not returned to the user. Some
298  * streams have an implementation that is more efficient than reading the data.
299  *
300  * This function is optional for inherited classes, as the default implementation
301  * emulates it using read.
302  *
303  * If @cancellable is not %NULL, then the operation can be cancelled by
304  * triggering the cancellable object from another thread. If the operation
305  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
306  * operation was partially finished when the operation was cancelled the
307  * partial result will be returned, without an error.
308  *
309  * Return value: Number of bytes skipped, or -1 on error
310  **/
311 gssize
312 g_input_stream_skip (GInputStream  *stream,
313                      gsize          count,
314                      GCancellable  *cancellable,
315                      GError       **error)
316 {
317   GInputStreamClass *class;
318   gssize res;
319
320   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
321
322   if (count == 0)
323     return 0;
324
325   if (((gssize) count) < 0)
326     {
327       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
328                    _("Too large count value passed to g_input_stream_skip"));
329       return -1;
330     }
331   
332   if (stream->priv->closed)
333     {
334       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
335                    _("Stream is already closed"));
336       return -1;
337     }
338
339   if (stream->priv->pending)
340     {
341       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
342                    _("Stream has outstanding operation"));
343       return -1;
344     }
345   
346   class = G_INPUT_STREAM_GET_CLASS (stream);
347
348   if (cancellable)
349     g_push_current_cancellable (cancellable);
350   
351   stream->priv->pending = TRUE;
352   res = class->skip (stream, count, cancellable, error);
353   stream->priv->pending = FALSE;
354
355   if (cancellable)
356     g_pop_current_cancellable (cancellable);
357   
358   return res;
359 }
360
361 static gssize
362 g_input_stream_real_skip (GInputStream  *stream,
363                           gsize          count,
364                           GCancellable  *cancellable,
365                           GError       **error)
366 {
367   GInputStreamClass *class;
368   gssize ret, read_bytes;
369   char buffer[8192];
370   GError *my_error;
371
372   class = G_INPUT_STREAM_GET_CLASS (stream);
373
374   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
375     {
376       if (g_seekable_seek (G_SEEKABLE (stream),
377                            count,
378                            G_SEEK_CUR,
379                            cancellable,
380                            NULL))
381         return count;
382     }
383
384   /* If not seekable, or seek failed, fall back to reading data: */
385
386   class = G_INPUT_STREAM_GET_CLASS (stream);
387   
388   read_bytes = 0;
389   while (1)
390     {
391       my_error = NULL;
392
393       ret = class->read (stream, buffer, MIN (sizeof (buffer), count),
394                          cancellable, &my_error);
395       if (ret == -1)
396         {
397           if (read_bytes > 0 &&
398               my_error->domain == G_IO_ERROR &&
399               my_error->code == G_IO_ERROR_CANCELLED)
400             {
401               g_error_free (my_error);
402               return read_bytes;
403             }
404           
405           g_propagate_error (error, my_error);
406           return -1;
407         }
408
409       count -= ret;
410       read_bytes += ret;
411       
412       if (ret == 0 || count == 0)
413         return read_bytes;
414     }
415 }
416
417 /**
418  * g_input_stream_close:
419  * @stream: A #GInputStream.
420  * @cancellable: optional #GCancellable object, %NULL to ignore.
421  * @error: location to store the error occuring, or %NULL to ignore
422  *
423  * Closes the stream, releasing resources related to it.
424  *
425  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
426  * Closing a stream multiple times will not return an error.
427  *
428  * Streams will be automatically closed when the last reference
429  * is dropped, but you might want to call make sure resources
430  * are released as early as possible.
431  *
432  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
433  * open after the stream is closed. See the documentation for the individual
434  * stream for details.
435  *
436  * On failure the first error that happened will be reported, but the close
437  * operation will finish as much as possible. A stream that failed to
438  * close will still return %G_IO_ERROR_CLOSED all operations. Still, it
439  * is important to check and report the error to the user.
440  *
441  * If @cancellable is not NULL, then the operation can be cancelled by
442  * triggering the cancellable object from another thread. If the operation
443  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
444  * Cancelling a close will still leave the stream closed, but some streams
445  * can use a faster close that doesn't block to e.g. check errors. 
446  *
447  * Return value: %TRUE on success, %FALSE on failure
448  **/
449 gboolean
450 g_input_stream_close (GInputStream  *stream,
451                       GCancellable  *cancellable,
452                       GError       **error)
453 {
454   GInputStreamClass *class;
455   gboolean res;
456
457   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
458
459   class = G_INPUT_STREAM_GET_CLASS (stream);
460
461   if (stream->priv->closed)
462     return TRUE;
463
464   if (stream->priv->pending)
465     {
466       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
467                    _("Stream has outstanding operation"));
468       return FALSE;
469     }
470   
471   res = TRUE;
472
473   stream->priv->pending = TRUE;
474
475   if (cancellable)
476     g_push_current_cancellable (cancellable);
477
478   if (class->close)
479     res = class->close (stream, cancellable, error);
480
481   if (cancellable)
482     g_pop_current_cancellable (cancellable);
483   
484   stream->priv->closed = TRUE;
485   
486   stream->priv->pending = FALSE;
487
488   return res;
489 }
490
491 static void
492 async_ready_callback_wrapper (GObject      *source_object,
493                               GAsyncResult *res,
494                               gpointer      user_data)
495 {
496   GInputStream *stream = G_INPUT_STREAM (source_object);
497
498   stream->priv->pending = FALSE;
499   if (stream->priv->outstanding_callback)
500     (*stream->priv->outstanding_callback) (source_object, res, user_data);
501   g_object_unref (stream);
502 }
503
504 static void
505 async_ready_close_callback_wrapper (GObject      *source_object,
506                                     GAsyncResult *res,
507                                     gpointer      user_data)
508 {
509   GInputStream *stream = G_INPUT_STREAM (source_object);
510
511   stream->priv->pending = FALSE;
512   stream->priv->closed = TRUE;
513   if (stream->priv->outstanding_callback)
514     (*stream->priv->outstanding_callback) (source_object, res, user_data);
515   g_object_unref (stream);
516 }
517
518 /**
519  * g_input_stream_read_async:
520  * @stream: A #GInputStream.
521  * @buffer: a buffer to read data into (which should be at least count bytes long).
522  * @count: the number of bytes that will be read from the stream
523  * @io_priority: the <link linkend="gio-GIOScheduler">I/O priority</link> 
524  * of the request. 
525  * @cancellable: optional #GCancellable object, %NULL to ignore.
526  * @callback: callback to call when the request is satisfied
527  * @user_data: the data to pass to callback function
528  *
529  * Request an asynchronous read of @count bytes from the stream into the buffer
530  * starting at @buffer. When the operation is finished @callback will be called,
531  * giving the results.
532  *
533  * During an async request no other sync and async calls are allowed, and will
534  * result in %G_IO_ERROR_PENDING errors. 
535  *
536  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
537  *
538  * On success, the number of bytes read into the buffer will be passed to the
539  * callback. It is not an error if this is not the same as the requested size, as it
540  * can happen e.g. near the end of a file, but generally we try to read
541  * as many bytes as requested. Zero is returned on end of file
542  * (or if @count is zero),  but never otherwise.
543  *
544  * Any outstanding i/o request with higher priority (lower numerical value) will
545  * be executed before an outstanding request with lower priority. Default
546  * priority is %G_PRIORITY_DEFAULT.
547  *
548  * The asyncronous methods have a default fallback that uses threads to implement
549  * asynchronicity, so they are optional for inheriting classes. However, if you
550  * override one you must override all.
551  **/
552 void
553 g_input_stream_read_async (GInputStream        *stream,
554                            void                *buffer,
555                            gsize                count,
556                            int                  io_priority,
557                            GCancellable        *cancellable,
558                            GAsyncReadyCallback  callback,
559                            gpointer             user_data)
560 {
561   GInputStreamClass *class;
562   GSimpleAsyncResult *simple;
563
564   g_return_if_fail (G_IS_INPUT_STREAM (stream));
565   g_return_if_fail (buffer != NULL);
566
567   if (count == 0)
568     {
569       simple = g_simple_async_result_new (G_OBJECT (stream),
570                                           callback,
571                                           user_data,
572                                           g_input_stream_read_async);
573       g_simple_async_result_complete_in_idle (simple);
574       g_object_unref (simple);
575       return;
576     }
577   
578   if (((gssize) count) < 0)
579     {
580       g_simple_async_report_error_in_idle (G_OBJECT (stream),
581                                            callback,
582                                            user_data,
583                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
584                                            _("Too large count value passed to g_input_stream_read_async"));
585       return;
586     }
587
588   if (stream->priv->closed)
589     {
590       g_simple_async_report_error_in_idle (G_OBJECT (stream),
591                                            callback,
592                                            user_data,
593                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
594                                            _("Stream is already closed"));
595       return;
596     }
597   
598   if (stream->priv->pending)
599     {
600       g_simple_async_report_error_in_idle (G_OBJECT (stream),
601                                            callback,
602                                            user_data,
603                                            G_IO_ERROR, G_IO_ERROR_PENDING,
604                                            _("Stream has outstanding operation"));
605       return;
606     }
607
608   class = G_INPUT_STREAM_GET_CLASS (stream);
609   
610   stream->priv->pending = TRUE;
611   stream->priv->outstanding_callback = callback;
612   g_object_ref (stream);
613   class->read_async (stream, buffer, count, io_priority, cancellable,
614                      async_ready_callback_wrapper, user_data);
615 }
616
617 /**
618  * g_input_stream_read_finish:
619  * @stream: a #GInputStream.
620  * @result: a #GAsyncResult.
621  * @error: a #GError location to store the error occuring, or %NULL to 
622  * ignore.
623  * 
624  * Finishes an asynchronous stream read operation. 
625  * 
626  * Returns: number of bytes read in, or -1 on error.
627  **/
628 gssize
629 g_input_stream_read_finish (GInputStream  *stream,
630                             GAsyncResult  *result,
631                             GError       **error)
632 {
633   GSimpleAsyncResult *simple;
634   GInputStreamClass *class;
635   
636   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
637   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
638
639   if (G_IS_SIMPLE_ASYNC_RESULT (result))
640     {
641       simple = G_SIMPLE_ASYNC_RESULT (result);
642       if (g_simple_async_result_propagate_error (simple, error))
643         return -1;
644
645       /* Special case read of 0 bytes */
646       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_read_async)
647         return 0;
648     }
649
650   class = G_INPUT_STREAM_GET_CLASS (stream);
651   return class->read_finish (stream, result, error);
652 }
653
654 /**
655  * g_input_stream_skip_async:
656  * @stream: A #GInputStream.
657  * @count: the number of bytes that will be skipped from the stream
658  * @io_priority: the <link linkend="gio-GIOScheduler">I/O priority</link> 
659  * of the request. 
660  * @cancellable: optional #GCancellable object, %NULL to ignore. 
661  * @callback: callback to call when the request is satisfied
662  * @user_data: the data to pass to callback function
663  *
664  * Request an asynchronous skip of @count bytes from the stream into the buffer
665  * starting at @buffer. When the operation is finished @callback will be called,
666  * giving the results.
667  *
668  * During an async request no other sync and async calls are allowed, and will
669  * result in %G_IO_ERROR_PENDING errors. 
670  *
671  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
672  *
673  * On success, the number of bytes skipped will be passed to the
674  * callback. It is not an error if this is not the same as the requested size, as it
675  * can happen e.g. near the end of a file, but generally we try to skip
676  * as many bytes as requested. Zero is returned on end of file
677  * (or if @count is zero), but never otherwise.
678  *
679  * Any outstanding i/o request with higher priority (lower numerical value) will
680  * be executed before an outstanding request with lower priority. Default
681  * priority is %G_PRIORITY_DEFAULT.
682  *
683  * The asyncronous methods have a default fallback that uses threads to implement
684  * asynchronicity, so they are optional for inheriting classes. However, if you
685  * override one you must override all.
686  **/
687 void
688 g_input_stream_skip_async (GInputStream        *stream,
689                            gsize                count,
690                            int                  io_priority,
691                            GCancellable        *cancellable,
692                            GAsyncReadyCallback  callback,
693                            gpointer             user_data)
694 {
695   GInputStreamClass *class;
696   GSimpleAsyncResult *simple;
697
698   g_return_if_fail (G_IS_INPUT_STREAM (stream));
699
700   if (count == 0)
701     {
702       simple = g_simple_async_result_new (G_OBJECT (stream),
703                                           callback,
704                                           user_data,
705                                           g_input_stream_skip_async);
706
707       g_simple_async_result_complete_in_idle (simple);
708       g_object_unref (simple);
709       return;
710     }
711   
712   if (((gssize) count) < 0)
713     {
714       g_simple_async_report_error_in_idle (G_OBJECT (stream),
715                                            callback,
716                                            user_data,
717                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
718                                            _("Too large count value passed to g_input_stream_skip_async"));
719       return;
720     }
721
722   if (stream->priv->closed)
723     {
724       g_simple_async_report_error_in_idle (G_OBJECT (stream),
725                                            callback,
726                                            user_data,
727                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
728                                            _("Stream is already closed"));
729       return;
730     }
731   
732   if (stream->priv->pending)
733     {
734       g_simple_async_report_error_in_idle (G_OBJECT (stream),
735                                            callback,
736                                            user_data,
737                                            G_IO_ERROR, G_IO_ERROR_PENDING,
738                                            _("Stream has outstanding operation"));
739       return;
740     }
741
742   class = G_INPUT_STREAM_GET_CLASS (stream);
743   stream->priv->pending = TRUE;
744   stream->priv->outstanding_callback = callback;
745   g_object_ref (stream);
746   class->skip_async (stream, count, io_priority, cancellable,
747                      async_ready_callback_wrapper, user_data);
748 }
749
750 /**
751  * g_input_stream_skip_finish:
752  * @stream: a #GInputStream.
753  * @result: a #GAsyncResult.
754  * @error: a #GError location to store the error occuring, or %NULL to 
755  * ignore.
756  * 
757  * Finishes a stream skip operation.
758  * 
759  * Returns: the size of the bytes skipped, or %-1 on error.
760  **/
761 gssize
762 g_input_stream_skip_finish (GInputStream  *stream,
763                             GAsyncResult  *result,
764                             GError       **error)
765 {
766   GSimpleAsyncResult *simple;
767   GInputStreamClass *class;
768
769   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
770   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
771
772   if (G_IS_SIMPLE_ASYNC_RESULT (result))
773     {
774       simple = G_SIMPLE_ASYNC_RESULT (result);
775       if (g_simple_async_result_propagate_error (simple, error))
776         return -1;
777
778       /* Special case skip of 0 bytes */
779       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_skip_async)
780         return 0;
781     }
782
783   class = G_INPUT_STREAM_GET_CLASS (stream);
784   return class->skip_finish (stream, result, error);
785 }
786
787 /**
788  * g_input_stream_close_async:
789  * @stream: A #GInputStream.
790  * @io_priority: the <link linkend="gio-GIOScheduler">I/O priority</link> 
791  * of the request. 
792  * @cancellable: optional cancellable object
793  * @callback: callback to call when the request is satisfied
794  * @user_data: the data to pass to callback function
795  *
796  * Requests an asynchronous closes of the stream, releasing resources related to it.
797  * When the operation is finished @callback will be called, giving the results.
798  *
799  * For behaviour details see g_input_stream_close().
800  *
801  * The asyncronous methods have a default fallback that uses threads to implement
802  * asynchronicity, so they are optional for inheriting classes. However, if you
803  * override one you must override all.
804  **/
805 void
806 g_input_stream_close_async (GInputStream        *stream,
807                             int                  io_priority,
808                             GCancellable        *cancellable,
809                             GAsyncReadyCallback  callback,
810                             gpointer             user_data)
811 {
812   GInputStreamClass *class;
813   GSimpleAsyncResult *simple;
814
815   g_return_if_fail (G_IS_INPUT_STREAM (stream));
816
817   if (stream->priv->closed)
818     {
819       simple = g_simple_async_result_new (G_OBJECT (stream),
820                                           callback,
821                                           user_data,
822                                           g_input_stream_close_async);
823
824       g_simple_async_result_complete_in_idle (simple);
825       g_object_unref (simple);
826       return;
827     }
828
829   if (stream->priv->pending)
830     {
831       g_simple_async_report_error_in_idle (G_OBJECT (stream),
832                                            callback,
833                                            user_data,
834                                            G_IO_ERROR, G_IO_ERROR_PENDING,
835                                            _("Stream has outstanding operation"));
836       return;
837     }
838   
839   class = G_INPUT_STREAM_GET_CLASS (stream);
840   stream->priv->pending = TRUE;
841   stream->priv->outstanding_callback = callback;
842   g_object_ref (stream);
843   class->close_async (stream, io_priority, cancellable,
844                       async_ready_close_callback_wrapper, user_data);
845 }
846
847 /**
848  * g_input_stream_close_finish:
849  * @stream: a #GInputStream.
850  * @result: a #GAsyncResult.
851  * @error: a #GError location to store the error occuring, or %NULL to 
852  * ignore.
853  * 
854  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
855  * 
856  * Returns: %TRUE if the stream was closed successfully.
857  **/
858 gboolean
859 g_input_stream_close_finish (GInputStream  *stream,
860                              GAsyncResult  *result,
861                              GError       **error)
862 {
863   GSimpleAsyncResult *simple;
864   GInputStreamClass *class;
865
866   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
867   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
868
869   if (G_IS_SIMPLE_ASYNC_RESULT (result))
870     {
871       simple = G_SIMPLE_ASYNC_RESULT (result);
872       if (g_simple_async_result_propagate_error (simple, error))
873         return FALSE;
874
875       /* Special case already closed */
876       if (g_simple_async_result_get_source_tag (simple) == g_input_stream_close_async)
877         return TRUE;
878     }
879
880   class = G_INPUT_STREAM_GET_CLASS (stream);
881   return class->close_finish (stream, result, error);
882 }
883
884 /**
885  * g_input_stream_is_closed:
886  * @stream: input stream.
887  * 
888  * Checks if an input stream is closed.
889  * 
890  * Returns: %TRUE if the stream is closed.
891  **/
892 gboolean
893 g_input_stream_is_closed (GInputStream *stream)
894 {
895   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
896   
897   return stream->priv->closed;
898 }
899  
900 /**
901  * g_input_stream_has_pending:
902  * @stream: input stream.
903  * 
904  * Checks if an input stream has pending actions.
905  * 
906  * Returns: %TRUE if @stream has pending actions.
907  **/  
908 gboolean
909 g_input_stream_has_pending (GInputStream *stream)
910 {
911   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
912   
913   return stream->priv->pending;
914 }
915
916 /**
917  * g_input_stream_set_pending:
918  * @stream: input stream
919  * @pending: boolean.
920  * 
921  * Sets @stream has actions pending.
922  **/
923 void
924 g_input_stream_set_pending (GInputStream *stream,
925                             gboolean      pending)
926 {
927   g_return_if_fail (G_IS_INPUT_STREAM (stream));
928   
929   stream->priv->pending = pending;
930 }
931
932 /********************************************
933  *   Default implementation of async ops    *
934  ********************************************/
935
936 typedef struct {
937   void              *buffer;
938   gsize              count_requested;
939   gssize             count_read;
940 } ReadData;
941
942 static void
943 read_async_thread (GSimpleAsyncResult *res,
944                    GObject            *object,
945                    GCancellable       *cancellable)
946 {
947   ReadData *op;
948   GInputStreamClass *class;
949   GError *error = NULL;
950  
951   op = g_simple_async_result_get_op_res_gpointer (res);
952
953   class = G_INPUT_STREAM_GET_CLASS (object);
954
955   op->count_read = class->read (G_INPUT_STREAM (object),
956                                 op->buffer, op->count_requested,
957                                 cancellable, &error);
958   if (op->count_read == -1)
959     {
960       g_simple_async_result_set_from_error (res, error);
961       g_error_free (error);
962     }
963 }
964
965 static void
966 g_input_stream_real_read_async (GInputStream        *stream,
967                                 void                *buffer,
968                                 gsize                count,
969                                 int                  io_priority,
970                                 GCancellable        *cancellable,
971                                 GAsyncReadyCallback  callback,
972                                 gpointer             user_data)
973 {
974   GSimpleAsyncResult *res;
975   ReadData *op;
976   
977   op = g_new (ReadData, 1);
978   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_input_stream_real_read_async);
979   g_simple_async_result_set_op_res_gpointer (res, op, g_free);
980   op->buffer = buffer;
981   op->count_requested = count;
982   
983   g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
984   g_object_unref (res);
985 }
986
987 static gssize
988 g_input_stream_real_read_finish (GInputStream  *stream,
989                                  GAsyncResult  *result,
990                                  GError       **error)
991 {
992   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
993   ReadData *op;
994
995   g_assert (g_simple_async_result_get_source_tag (simple) == 
996             g_input_stream_real_read_async);
997
998   op = g_simple_async_result_get_op_res_gpointer (simple);
999
1000   return op->count_read;
1001 }
1002
1003 typedef struct {
1004   gsize count_requested;
1005   gssize count_skipped;
1006 } SkipData;
1007
1008
1009 static void
1010 skip_async_thread (GSimpleAsyncResult *res,
1011                    GObject            *object,
1012                    GCancellable       *cancellable)
1013 {
1014   SkipData *op;
1015   GInputStreamClass *class;
1016   GError *error = NULL;
1017   
1018   class = G_INPUT_STREAM_GET_CLASS (object);
1019   op = g_simple_async_result_get_op_res_gpointer (res);
1020   op->count_skipped = class->skip (G_INPUT_STREAM (object),
1021                                    op->count_requested,
1022                                    cancellable, &error);
1023   if (op->count_skipped == -1)
1024     {
1025       g_simple_async_result_set_from_error (res, error);
1026       g_error_free (error);
1027     }
1028 }
1029
1030 typedef struct {
1031   char buffer[8192];
1032   gsize count;
1033   gsize count_skipped;
1034   int io_prio;
1035   GCancellable *cancellable;
1036   gpointer user_data;
1037   GAsyncReadyCallback callback;
1038 } SkipFallbackAsyncData;
1039
1040 static void
1041 skip_callback_wrapper (GObject      *source_object,
1042                        GAsyncResult *res,
1043                        gpointer      user_data)
1044 {
1045   GInputStreamClass *class;
1046   SkipFallbackAsyncData *data = user_data;
1047   SkipData *op;
1048   GSimpleAsyncResult *simple;
1049   GError *error = NULL;
1050   gssize ret;
1051
1052   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1053
1054   if (ret > 0)
1055     {
1056       data->count -= ret;
1057       data->count_skipped += ret;
1058
1059       if (data->count > 0)
1060         {
1061           class = G_INPUT_STREAM_GET_CLASS (source_object);
1062           class->read_async (G_INPUT_STREAM (source_object), data->buffer, MIN (8192, data->count), data->io_prio, data->cancellable,
1063                              skip_callback_wrapper, data);
1064           return;
1065         }
1066     }
1067
1068   op = g_new0 (SkipData, 1);
1069   op->count_skipped = data->count_skipped;
1070   simple = g_simple_async_result_new (source_object,
1071                                       data->callback, data->user_data,
1072                                       g_input_stream_real_skip_async);
1073
1074   g_simple_async_result_set_op_res_gpointer (simple, op, g_free);
1075
1076   if (ret == -1)
1077     {
1078       if (data->count_skipped && 
1079           error->domain == G_IO_ERROR &&
1080           error->code == G_IO_ERROR_CANCELLED)
1081         { /* No error, return partial read */ }
1082       else
1083         g_simple_async_result_set_from_error (simple, error);
1084       g_error_free (error);
1085     }
1086
1087   /* Complete immediately, not in idle, since we're already in a mainloop callout */
1088   g_simple_async_result_complete (simple);
1089   g_object_unref (simple);
1090   
1091   g_free (data);
1092  }
1093
1094 static void
1095 g_input_stream_real_skip_async (GInputStream        *stream,
1096                                 gsize                count,
1097                                 int                  io_priority,
1098                                 GCancellable        *cancellable,
1099                                 GAsyncReadyCallback  callback,
1100                                 gpointer             user_data)
1101 {
1102   GInputStreamClass *class;
1103   SkipData *op;
1104   SkipFallbackAsyncData *data;
1105   GSimpleAsyncResult *res;
1106
1107   class = G_INPUT_STREAM_GET_CLASS (stream);
1108
1109   if (class->read_async == g_input_stream_real_read_async)
1110     {
1111       /* Read is thread-using async fallback.
1112        * Make skip use threads too, so that we can use a possible sync skip
1113        * implementation. */
1114       op = g_new0 (SkipData, 1);
1115       
1116       res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
1117                                        g_input_stream_real_skip_async);
1118
1119       g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1120
1121       op->count_requested = count;
1122
1123       g_simple_async_result_run_in_thread (res, skip_async_thread, io_priority, cancellable);
1124       g_object_unref (res);
1125     }
1126   else
1127     {
1128       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1129       
1130       /* There is a custom async read function, lets use that. */
1131       data = g_new (SkipFallbackAsyncData, 1);
1132       data->count = count;
1133       data->count_skipped = 0;
1134       data->io_prio = io_priority;
1135       data->cancellable = cancellable;
1136       data->callback = callback;
1137       data->user_data = user_data;
1138       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1139                          skip_callback_wrapper, data);
1140     }
1141
1142 }
1143
1144 static gssize
1145 g_input_stream_real_skip_finish (GInputStream  *stream,
1146                                  GAsyncResult  *result,
1147                                  GError       **error)
1148 {
1149   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1150   SkipData *op;
1151
1152   g_assert (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_skip_async);
1153   op = g_simple_async_result_get_op_res_gpointer (simple);
1154   return op->count_skipped;
1155 }
1156
1157 static void
1158 close_async_thread (GSimpleAsyncResult *res,
1159                     GObject            *object,
1160                     GCancellable       *cancellable)
1161 {
1162   GInputStreamClass *class;
1163   GError *error = NULL;
1164   gboolean result;
1165
1166   /* Auto handling of cancelation disabled, and ignore
1167      cancellation, since we want to close things anyway, although
1168      possibly in a quick-n-dirty way. At least we never want to leak
1169      open handles */
1170   
1171   class = G_INPUT_STREAM_GET_CLASS (object);
1172   result = class->close (G_INPUT_STREAM (object), cancellable, &error);
1173   if (!result)
1174     {
1175       g_simple_async_result_set_from_error (res, error);
1176       g_error_free (error);
1177     }
1178 }
1179
1180 static void
1181 g_input_stream_real_close_async (GInputStream        *stream,
1182                                  int                  io_priority,
1183                                  GCancellable        *cancellable,
1184                                  GAsyncReadyCallback  callback,
1185                                  gpointer             user_data)
1186 {
1187   GSimpleAsyncResult *res;
1188   
1189   res = g_simple_async_result_new (G_OBJECT (stream),
1190                                    callback,
1191                                    user_data,
1192                                    g_input_stream_real_close_async);
1193
1194   g_simple_async_result_set_handle_cancellation (res, FALSE);
1195   
1196   g_simple_async_result_run_in_thread (res,
1197                                        close_async_thread,
1198                                        io_priority,
1199                                        cancellable);
1200   g_object_unref (res);
1201 }
1202
1203 static gboolean
1204 g_input_stream_real_close_finish (GInputStream  *stream,
1205                                   GAsyncResult  *result,
1206                                   GError       **error)
1207 {
1208   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1209   g_assert (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_close_async);
1210   return TRUE;
1211 }
1212
1213 #define __G_INPUT_STREAM_C__
1214 #include "gioaliasdef.c"