ab8ceb4db85c7a0758f9aeba04322585eae7f408
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Alexander Larsson <alexl@redhat.com>
19  */
20
21 #include "config.h"
22 #include <glib.h>
23 #include "glibintl.h"
24
25 #include "ginputstream.h"
26 #include "gioprivate.h"
27 #include "gseekable.h"
28 #include "gcancellable.h"
29 #include "gasyncresult.h"
30 #include "gioerror.h"
31 #include "gpollableinputstream.h"
32
33 /**
34  * SECTION:ginputstream
35  * @short_description: Base class for implementing streaming input
36  * @include: gio/gio.h
37  *
38  * #GInputStream has functions to read from a stream (g_input_stream_read()),
39  * to close a stream (g_input_stream_close()) and to skip some content
40  * (g_input_stream_skip()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 struct _GInputStreamPrivate {
49   guint closed : 1;
50   guint pending : 1;
51   GAsyncReadyCallback outstanding_callback;
52 };
53
54 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
55
56 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
57                                                   gsize                 count,
58                                                   GCancellable         *cancellable,
59                                                   GError              **error);
60 static void     g_input_stream_real_read_async   (GInputStream         *stream,
61                                                   void                 *buffer,
62                                                   gsize                 count,
63                                                   int                   io_priority,
64                                                   GCancellable         *cancellable,
65                                                   GAsyncReadyCallback   callback,
66                                                   gpointer              user_data);
67 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
68                                                   GAsyncResult         *result,
69                                                   GError              **error);
70 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
71                                                   gsize                 count,
72                                                   int                   io_priority,
73                                                   GCancellable         *cancellable,
74                                                   GAsyncReadyCallback   callback,
75                                                   gpointer              data);
76 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
77                                                   GAsyncResult         *result,
78                                                   GError              **error);
79 static void     g_input_stream_real_close_async  (GInputStream         *stream,
80                                                   int                   io_priority,
81                                                   GCancellable         *cancellable,
82                                                   GAsyncReadyCallback   callback,
83                                                   gpointer              data);
84 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
85                                                   GAsyncResult         *result,
86                                                   GError              **error);
87
88 static void
89 g_input_stream_dispose (GObject *object)
90 {
91   GInputStream *stream;
92
93   stream = G_INPUT_STREAM (object);
94   
95   if (!stream->priv->closed)
96     g_input_stream_close (stream, NULL, NULL);
97
98   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
99 }
100
101
102 static void
103 g_input_stream_class_init (GInputStreamClass *klass)
104 {
105   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
106   
107   gobject_class->dispose = g_input_stream_dispose;
108   
109   klass->skip = g_input_stream_real_skip;
110   klass->read_async = g_input_stream_real_read_async;
111   klass->read_finish = g_input_stream_real_read_finish;
112   klass->skip_async = g_input_stream_real_skip_async;
113   klass->skip_finish = g_input_stream_real_skip_finish;
114   klass->close_async = g_input_stream_real_close_async;
115   klass->close_finish = g_input_stream_real_close_finish;
116 }
117
118 static void
119 g_input_stream_init (GInputStream *stream)
120 {
121   stream->priv = g_input_stream_get_instance_private (stream);
122 }
123
124 /**
125  * g_input_stream_read:
126  * @stream: a #GInputStream.
127  * @buffer: (array length=count) (element-type guint8): a buffer to
128  *     read data into (which should be at least count bytes long).
129  * @count: the number of bytes that will be read from the stream
130  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
131  * @error: location to store the error occurring, or %NULL to ignore
132  *
133  * Tries to read @count bytes from the stream into the buffer starting at
134  * @buffer. Will block during this read.
135  * 
136  * If count is zero returns zero and does nothing. A value of @count
137  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
138  *
139  * On success, the number of bytes read into the buffer is returned.
140  * It is not an error if this is not the same as the requested size, as it
141  * can happen e.g. near the end of a file. Zero is returned on end of file
142  * (or if @count is zero),  but never otherwise.
143  *
144  * The returned @buffer is not a nul-terminated string, it can contain nul bytes
145  * at any position, and this function doesn't nul-terminate the @buffer.
146  *
147  * If @cancellable is not %NULL, then the operation can be cancelled by
148  * triggering the cancellable object from another thread. If the operation
149  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
150  * operation was partially finished when the operation was cancelled the
151  * partial result will be returned, without an error.
152  *
153  * On error -1 is returned and @error is set accordingly.
154  * 
155  * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
156  **/
157 gssize
158 g_input_stream_read  (GInputStream  *stream,
159                       void          *buffer,
160                       gsize          count,
161                       GCancellable  *cancellable,
162                       GError       **error)
163 {
164   GInputStreamClass *class;
165   gssize res;
166
167   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
168   g_return_val_if_fail (buffer != NULL, 0);
169
170   if (count == 0)
171     return 0;
172   
173   if (((gssize) count) < 0)
174     {
175       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
176                    _("Too large count value passed to %s"), G_STRFUNC);
177       return -1;
178     }
179
180   class = G_INPUT_STREAM_GET_CLASS (stream);
181
182   if (class->read_fn == NULL) 
183     {
184       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
185                            _("Input stream doesn't implement read"));
186       return -1;
187     }
188
189   if (!g_input_stream_set_pending (stream, error))
190     return -1;
191
192   if (cancellable)
193     g_cancellable_push_current (cancellable);
194   
195   res = class->read_fn (stream, buffer, count, cancellable, error);
196
197   if (cancellable)
198     g_cancellable_pop_current (cancellable);
199   
200   g_input_stream_clear_pending (stream);
201
202   return res;
203 }
204
205 /**
206  * g_input_stream_read_all:
207  * @stream: a #GInputStream.
208  * @buffer: (array length=count) (element-type guint8): a buffer to
209  *     read data into (which should be at least count bytes long).
210  * @count: the number of bytes that will be read from the stream
211  * @bytes_read: (out): location to store the number of bytes that was read from the stream
212  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
213  * @error: location to store the error occurring, or %NULL to ignore
214  *
215  * Tries to read @count bytes from the stream into the buffer starting at
216  * @buffer. Will block during this read.
217  *
218  * This function is similar to g_input_stream_read(), except it tries to
219  * read as many bytes as requested, only stopping on an error or end of stream.
220  *
221  * On a successful read of @count bytes, or if we reached the end of the
222  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
223  * read into @buffer.
224  * 
225  * If there is an error during the operation %FALSE is returned and @error
226  * is set to indicate the error status, @bytes_read is updated to contain
227  * the number of bytes read into @buffer before the error occurred.
228  *
229  * Returns: %TRUE on success, %FALSE if there was an error
230  **/
231 gboolean
232 g_input_stream_read_all (GInputStream  *stream,
233                          void          *buffer,
234                          gsize          count,
235                          gsize         *bytes_read,
236                          GCancellable  *cancellable,
237                          GError       **error)
238 {
239   gsize _bytes_read;
240   gssize res;
241
242   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
243   g_return_val_if_fail (buffer != NULL, FALSE);
244
245   _bytes_read = 0;
246   while (_bytes_read < count)
247     {
248       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
249                                  cancellable, error);
250       if (res == -1)
251         {
252           if (bytes_read)
253             *bytes_read = _bytes_read;
254           return FALSE;
255         }
256       
257       if (res == 0)
258         break;
259
260       _bytes_read += res;
261     }
262
263   if (bytes_read)
264     *bytes_read = _bytes_read;
265   return TRUE;
266 }
267
268 /**
269  * g_input_stream_read_bytes:
270  * @stream: a #GInputStream.
271  * @count: maximum number of bytes that will be read from the stream. Common
272  * values include 4096 and 8192.
273  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
274  * @error: location to store the error occurring, or %NULL to ignore
275  *
276  * Like g_input_stream_read(), this tries to read @count bytes from
277  * the stream in a blocking fashion. However, rather than reading into
278  * a user-supplied buffer, this will create a new #GBytes containing
279  * the data that was read. This may be easier to use from language
280  * bindings.
281  *
282  * If count is zero, returns a zero-length #GBytes and does nothing. A
283  * value of @count larger than %G_MAXSSIZE will cause a
284  * %G_IO_ERROR_INVALID_ARGUMENT error.
285  *
286  * On success, a new #GBytes is returned. It is not an error if the
287  * size of this object is not the same as the requested size, as it
288  * can happen e.g. near the end of a file. A zero-length #GBytes is
289  * returned on end of file (or if @count is zero), but never
290  * otherwise.
291  *
292  * If @cancellable is not %NULL, then the operation can be cancelled by
293  * triggering the cancellable object from another thread. If the operation
294  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
295  * operation was partially finished when the operation was cancelled the
296  * partial result will be returned, without an error.
297  *
298  * On error %NULL is returned and @error is set accordingly.
299  *
300  * Returns: a new #GBytes, or %NULL on error
301  *
302  * Since: 2.34
303  **/
304 GBytes *
305 g_input_stream_read_bytes (GInputStream  *stream,
306                            gsize          count,
307                            GCancellable  *cancellable,
308                            GError       **error)
309 {
310   guchar *buf;
311   gssize nread;
312
313   buf = g_malloc (count);
314   nread = g_input_stream_read (stream, buf, count, cancellable, error);
315   if (nread == -1)
316     {
317       g_free (buf);
318       return NULL;
319     }
320   else if (nread == 0)
321     {
322       g_free (buf);
323       return g_bytes_new_static ("", 0);
324     }
325   else
326     return g_bytes_new_take (buf, nread);
327 }
328
329 /**
330  * g_input_stream_skip:
331  * @stream: a #GInputStream.
332  * @count: the number of bytes that will be skipped from the stream
333  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
334  * @error: location to store the error occurring, or %NULL to ignore
335  *
336  * Tries to skip @count bytes from the stream. Will block during the operation.
337  *
338  * This is identical to g_input_stream_read(), from a behaviour standpoint,
339  * but the bytes that are skipped are not returned to the user. Some
340  * streams have an implementation that is more efficient than reading the data.
341  *
342  * This function is optional for inherited classes, as the default implementation
343  * emulates it using read.
344  *
345  * If @cancellable is not %NULL, then the operation can be cancelled by
346  * triggering the cancellable object from another thread. If the operation
347  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
348  * operation was partially finished when the operation was cancelled the
349  * partial result will be returned, without an error.
350  *
351  * Returns: Number of bytes skipped, or -1 on error
352  **/
353 gssize
354 g_input_stream_skip (GInputStream  *stream,
355                      gsize          count,
356                      GCancellable  *cancellable,
357                      GError       **error)
358 {
359   GInputStreamClass *class;
360   gssize res;
361
362   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
363
364   if (count == 0)
365     return 0;
366
367   if (((gssize) count) < 0)
368     {
369       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
370                    _("Too large count value passed to %s"), G_STRFUNC);
371       return -1;
372     }
373   
374   class = G_INPUT_STREAM_GET_CLASS (stream);
375
376   if (!g_input_stream_set_pending (stream, error))
377     return -1;
378
379   if (cancellable)
380     g_cancellable_push_current (cancellable);
381   
382   res = class->skip (stream, count, cancellable, error);
383
384   if (cancellable)
385     g_cancellable_pop_current (cancellable);
386   
387   g_input_stream_clear_pending (stream);
388
389   return res;
390 }
391
392 static gssize
393 g_input_stream_real_skip (GInputStream  *stream,
394                           gsize          count,
395                           GCancellable  *cancellable,
396                           GError       **error)
397 {
398   GInputStreamClass *class;
399   gssize ret, read_bytes;
400   char buffer[8192];
401   GError *my_error;
402
403   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
404     {
405       if (g_seekable_seek (G_SEEKABLE (stream),
406                            count,
407                            G_SEEK_CUR,
408                            cancellable,
409                            NULL))
410         return count;
411     }
412
413   /* If not seekable, or seek failed, fall back to reading data: */
414
415   class = G_INPUT_STREAM_GET_CLASS (stream);
416
417   read_bytes = 0;
418   while (1)
419     {
420       my_error = NULL;
421
422       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
423                             cancellable, &my_error);
424       if (ret == -1)
425         {
426           if (read_bytes > 0 &&
427               my_error->domain == G_IO_ERROR &&
428               my_error->code == G_IO_ERROR_CANCELLED)
429             {
430               g_error_free (my_error);
431               return read_bytes;
432             }
433
434           g_propagate_error (error, my_error);
435           return -1;
436         }
437
438       count -= ret;
439       read_bytes += ret;
440
441       if (ret == 0 || count == 0)
442         return read_bytes;
443     }
444 }
445
446 /**
447  * g_input_stream_close:
448  * @stream: A #GInputStream.
449  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
450  * @error: location to store the error occurring, or %NULL to ignore
451  *
452  * Closes the stream, releasing resources related to it.
453  *
454  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
455  * Closing a stream multiple times will not return an error.
456  *
457  * Streams will be automatically closed when the last reference
458  * is dropped, but you might want to call this function to make sure 
459  * resources are released as early as possible.
460  *
461  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
462  * open after the stream is closed. See the documentation for the individual
463  * stream for details.
464  *
465  * On failure the first error that happened will be reported, but the close
466  * operation will finish as much as possible. A stream that failed to
467  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
468  * is important to check and report the error to the user.
469  *
470  * If @cancellable is not %NULL, then the operation can be cancelled by
471  * triggering the cancellable object from another thread. If the operation
472  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
473  * Cancelling a close will still leave the stream closed, but some streams
474  * can use a faster close that doesn't block to e.g. check errors. 
475  *
476  * Returns: %TRUE on success, %FALSE on failure
477  **/
478 gboolean
479 g_input_stream_close (GInputStream  *stream,
480                       GCancellable  *cancellable,
481                       GError       **error)
482 {
483   GInputStreamClass *class;
484   gboolean res;
485
486   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
487
488   class = G_INPUT_STREAM_GET_CLASS (stream);
489
490   if (stream->priv->closed)
491     return TRUE;
492
493   res = TRUE;
494
495   if (!g_input_stream_set_pending (stream, error))
496     return FALSE;
497
498   if (cancellable)
499     g_cancellable_push_current (cancellable);
500
501   if (class->close_fn)
502     res = class->close_fn (stream, cancellable, error);
503
504   if (cancellable)
505     g_cancellable_pop_current (cancellable);
506
507   g_input_stream_clear_pending (stream);
508   
509   stream->priv->closed = TRUE;
510   
511   return res;
512 }
513
514 static void
515 async_ready_callback_wrapper (GObject      *source_object,
516                               GAsyncResult *res,
517                               gpointer      user_data)
518 {
519   GInputStream *stream = G_INPUT_STREAM (source_object);
520
521   g_input_stream_clear_pending (stream);
522   if (stream->priv->outstanding_callback)
523     (*stream->priv->outstanding_callback) (source_object, res, user_data);
524   g_object_unref (stream);
525 }
526
527 static void
528 async_ready_close_callback_wrapper (GObject      *source_object,
529                                     GAsyncResult *res,
530                                     gpointer      user_data)
531 {
532   GInputStream *stream = G_INPUT_STREAM (source_object);
533
534   g_input_stream_clear_pending (stream);
535   stream->priv->closed = TRUE;
536   if (stream->priv->outstanding_callback)
537     (*stream->priv->outstanding_callback) (source_object, res, user_data);
538   g_object_unref (stream);
539 }
540
541 /**
542  * g_input_stream_read_async:
543  * @stream: A #GInputStream.
544  * @buffer: (array length=count) (element-type guint8): a buffer to
545  *     read data into (which should be at least count bytes long).
546  * @count: the number of bytes that will be read from the stream
547  * @io_priority: the [I/O priority][io-priority]
548  * of the request. 
549  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
550  * @callback: (scope async): callback to call when the request is satisfied
551  * @user_data: (closure): the data to pass to callback function
552  *
553  * Request an asynchronous read of @count bytes from the stream into the buffer
554  * starting at @buffer. When the operation is finished @callback will be called. 
555  * You can then call g_input_stream_read_finish() to get the result of the 
556  * operation.
557  *
558  * During an async request no other sync and async calls are allowed on @stream, and will
559  * result in %G_IO_ERROR_PENDING errors. 
560  *
561  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
562  *
563  * On success, the number of bytes read into the buffer will be passed to the
564  * callback. It is not an error if this is not the same as the requested size, as it
565  * can happen e.g. near the end of a file, but generally we try to read
566  * as many bytes as requested. Zero is returned on end of file
567  * (or if @count is zero),  but never otherwise.
568  *
569  * Any outstanding i/o request with higher priority (lower numerical value) will
570  * be executed before an outstanding request with lower priority. Default
571  * priority is %G_PRIORITY_DEFAULT.
572  *
573  * The asyncronous methods have a default fallback that uses threads to implement
574  * asynchronicity, so they are optional for inheriting classes. However, if you
575  * override one you must override all.
576  **/
577 void
578 g_input_stream_read_async (GInputStream        *stream,
579                            void                *buffer,
580                            gsize                count,
581                            int                  io_priority,
582                            GCancellable        *cancellable,
583                            GAsyncReadyCallback  callback,
584                            gpointer             user_data)
585 {
586   GInputStreamClass *class;
587   GError *error = NULL;
588
589   g_return_if_fail (G_IS_INPUT_STREAM (stream));
590   g_return_if_fail (buffer != NULL);
591
592   if (count == 0)
593     {
594       GTask *task;
595
596       task = g_task_new (stream, cancellable, callback, user_data);
597       g_task_set_source_tag (task, g_input_stream_read_async);
598       g_task_return_int (task, 0);
599       g_object_unref (task);
600       return;
601     }
602   
603   if (((gssize) count) < 0)
604     {
605       g_task_report_new_error (stream, callback, user_data,
606                                g_input_stream_read_async,
607                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
608                                _("Too large count value passed to %s"),
609                                G_STRFUNC);
610       return;
611     }
612
613   if (!g_input_stream_set_pending (stream, &error))
614     {
615       g_task_report_error (stream, callback, user_data,
616                            g_input_stream_read_async,
617                            error);
618       return;
619     }
620
621   class = G_INPUT_STREAM_GET_CLASS (stream);
622   stream->priv->outstanding_callback = callback;
623   g_object_ref (stream);
624   class->read_async (stream, buffer, count, io_priority, cancellable,
625                      async_ready_callback_wrapper, user_data);
626 }
627
628 /**
629  * g_input_stream_read_finish:
630  * @stream: a #GInputStream.
631  * @result: a #GAsyncResult.
632  * @error: a #GError location to store the error occurring, or %NULL to 
633  * ignore.
634  * 
635  * Finishes an asynchronous stream read operation. 
636  * 
637  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
638  **/
639 gssize
640 g_input_stream_read_finish (GInputStream  *stream,
641                             GAsyncResult  *result,
642                             GError       **error)
643 {
644   GInputStreamClass *class;
645   
646   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
647   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
648
649   if (g_async_result_legacy_propagate_error (result, error))
650     return -1;
651   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
652     return g_task_propagate_int (G_TASK (result), error);
653
654   class = G_INPUT_STREAM_GET_CLASS (stream);
655   return class->read_finish (stream, result, error);
656 }
657
658 typedef struct
659 {
660   gchar *buffer;
661   gsize to_read;
662   gsize bytes_read;
663 } AsyncReadAll;
664
665 static void
666 free_async_read_all (gpointer data)
667 {
668   g_slice_free (AsyncReadAll, data);
669 }
670
671 static void
672 read_all_callback (GObject      *stream,
673                    GAsyncResult *result,
674                    gpointer      user_data)
675 {
676   GTask *task = user_data;
677   AsyncReadAll *data = g_task_get_task_data (task);
678   gboolean got_eof = FALSE;
679
680   if (result)
681     {
682       GError *error = NULL;
683       gssize nread;
684
685       nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
686
687       if (nread == -1)
688         {
689           g_task_return_error (task, error);
690           g_object_unref (task);
691           return;
692         }
693
694       g_assert_cmpint (nread, <=, data->to_read);
695       data->to_read -= nread;
696       data->bytes_read += nread;
697       got_eof = (nread == 0);
698     }
699
700   if (got_eof || data->to_read == 0)
701     {
702       g_task_return_boolean (task, TRUE);
703       g_object_unref (task);
704     }
705
706   else
707     g_input_stream_read_async (G_INPUT_STREAM (stream),
708                                data->buffer + data->bytes_read,
709                                data->to_read,
710                                g_task_get_priority (task),
711                                g_task_get_cancellable (task),
712                                read_all_callback, task);
713 }
714
715
716 static void
717 read_all_async_thread (GTask        *task,
718                        gpointer      source_object,
719                        gpointer      task_data,
720                        GCancellable *cancellable)
721 {
722   GInputStream *stream = source_object;
723   AsyncReadAll *data = task_data;
724   GError *error = NULL;
725
726   if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
727                                g_task_get_cancellable (task), &error))
728     g_task_return_boolean (task, TRUE);
729   else
730     g_task_return_error (task, error);
731 }
732
733 /**
734  * g_input_stream_read_all_async:
735  * @stream: A #GInputStream
736  * @buffer: (array length=count) (element-type guint8): a buffer to
737  *     read data into (which should be at least count bytes long)
738  * @count: the number of bytes that will be read from the stream
739  * @io_priority: the [I/O priority][io-priority] of the request
740  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore
741  * @callback: (scope async): callback to call when the request is satisfied
742  * @user_data: (closure): the data to pass to callback function
743  *
744  * Request an asynchronous read of @count bytes from the stream into the
745  * buffer starting at @buffer.
746  *
747  * This is the asynchronous equivalent of g_input_stream_read_all().
748  *
749  * Call g_input_stream_read_all_finish() to collect the result.
750  *
751  * Any outstanding I/O request with higher priority (lower numerical
752  * value) will be executed before an outstanding request with lower
753  * priority. Default priority is %G_PRIORITY_DEFAULT.
754  *
755  * Since: 2.44
756  **/
757 void
758 g_input_stream_read_all_async (GInputStream        *stream,
759                                void                *buffer,
760                                gsize                count,
761                                int                  io_priority,
762                                GCancellable        *cancellable,
763                                GAsyncReadyCallback  callback,
764                                gpointer             user_data)
765 {
766   AsyncReadAll *data;
767   GTask *task;
768
769   g_return_if_fail (G_IS_INPUT_STREAM (stream));
770   g_return_if_fail (buffer != NULL || count == 0);
771
772   task = g_task_new (stream, cancellable, callback, user_data);
773   data = g_slice_new0 (AsyncReadAll);
774   data->buffer = buffer;
775   data->to_read = count;
776
777   g_task_set_task_data (task, data, free_async_read_all);
778   g_task_set_priority (task, io_priority);
779
780   /* If async reads are going to be handled via the threadpool anyway
781    * then we may as well do it with a single dispatch instead of
782    * bouncing in and out.
783    */
784   if (g_input_stream_async_read_is_via_threads (stream))
785     {
786       g_task_run_in_thread (task, read_all_async_thread);
787       g_object_unref (task);
788     }
789   else
790     read_all_callback (G_OBJECT (stream), NULL, task);
791 }
792
793 /**
794  * g_input_stream_read_all_finish:
795  * @stream: a #GInputStream
796  * @result: a #GAsyncResult
797  * @bytes_read: (out): location to store the number of bytes that was read from the stream
798  * @error: a #GError location to store the error occurring, or %NULL to ignore
799  *
800  * Finishes an asynchronous stream read operation started with
801  * g_input_stream_read_all_async().
802  *
803  * As a special exception to the normal conventions for functions that
804  * use #GError, if this function returns %FALSE (and sets @error) then
805  * @bytes_read will be set to the number of bytes that were successfully
806  * read before the error was encountered.  This functionality is only
807  * available from C.  If you need it from another language then you must
808  * write your own loop around g_input_stream_read_async().
809  *
810  * Returns: %TRUE on success, %FALSE if there was an error
811  *
812  * Since: 2.44
813  **/
814 gboolean
815 g_input_stream_read_all_finish (GInputStream  *stream,
816                                 GAsyncResult  *result,
817                                 gsize         *bytes_read,
818                                 GError       **error)
819 {
820   GTask *task;
821
822   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
823   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
824
825   task = G_TASK (result);
826
827   if (bytes_read)
828     {
829       AsyncReadAll *data = g_task_get_task_data (task);
830
831       *bytes_read = data->bytes_read;
832     }
833
834   return g_task_propagate_boolean (task, error);
835 }
836
837 static void
838 read_bytes_callback (GObject      *stream,
839                      GAsyncResult *result,
840                      gpointer      user_data)
841 {
842   GTask *task = user_data;
843   guchar *buf = g_task_get_task_data (task);
844   GError *error = NULL;
845   gssize nread;
846   GBytes *bytes = NULL;
847
848   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
849                                       result, &error);
850   if (nread == -1)
851     {
852       g_free (buf);
853       g_task_return_error (task, error);
854     }
855   else if (nread == 0)
856     {
857       g_free (buf);
858       bytes = g_bytes_new_static ("", 0);
859     }
860   else
861     bytes = g_bytes_new_take (buf, nread);
862
863   if (bytes)
864     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
865
866   g_object_unref (task);
867 }
868
869 /**
870  * g_input_stream_read_bytes_async:
871  * @stream: A #GInputStream.
872  * @count: the number of bytes that will be read from the stream
873  * @io_priority: the [I/O priority][io-priority] of the request
874  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
875  * @callback: (scope async): callback to call when the request is satisfied
876  * @user_data: (closure): the data to pass to callback function
877  *
878  * Request an asynchronous read of @count bytes from the stream into a
879  * new #GBytes. When the operation is finished @callback will be
880  * called. You can then call g_input_stream_read_bytes_finish() to get the
881  * result of the operation.
882  *
883  * During an async request no other sync and async calls are allowed
884  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
885  *
886  * A value of @count larger than %G_MAXSSIZE will cause a
887  * %G_IO_ERROR_INVALID_ARGUMENT error.
888  *
889  * On success, the new #GBytes will be passed to the callback. It is
890  * not an error if this is smaller than the requested size, as it can
891  * happen e.g. near the end of a file, but generally we try to read as
892  * many bytes as requested. Zero is returned on end of file (or if
893  * @count is zero), but never otherwise.
894  *
895  * Any outstanding I/O request with higher priority (lower numerical
896  * value) will be executed before an outstanding request with lower
897  * priority. Default priority is %G_PRIORITY_DEFAULT.
898  *
899  * Since: 2.34
900  **/
901 void
902 g_input_stream_read_bytes_async (GInputStream          *stream,
903                                  gsize                  count,
904                                  int                    io_priority,
905                                  GCancellable          *cancellable,
906                                  GAsyncReadyCallback    callback,
907                                  gpointer               user_data)
908 {
909   GTask *task;
910   guchar *buf;
911
912   task = g_task_new (stream, cancellable, callback, user_data);
913   buf = g_malloc (count);
914   g_task_set_task_data (task, buf, NULL);
915
916   g_input_stream_read_async (stream, buf, count,
917                              io_priority, cancellable,
918                              read_bytes_callback, task);
919 }
920
921 /**
922  * g_input_stream_read_bytes_finish:
923  * @stream: a #GInputStream.
924  * @result: a #GAsyncResult.
925  * @error: a #GError location to store the error occurring, or %NULL to
926  *   ignore.
927  *
928  * Finishes an asynchronous stream read-into-#GBytes operation.
929  *
930  * Returns: the newly-allocated #GBytes, or %NULL on error
931  *
932  * Since: 2.34
933  **/
934 GBytes *
935 g_input_stream_read_bytes_finish (GInputStream  *stream,
936                                   GAsyncResult  *result,
937                                   GError       **error)
938 {
939   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
940   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
941
942   return g_task_propagate_pointer (G_TASK (result), error);
943 }
944
945 /**
946  * g_input_stream_skip_async:
947  * @stream: A #GInputStream.
948  * @count: the number of bytes that will be skipped from the stream
949  * @io_priority: the [I/O priority][io-priority] of the request
950  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
951  * @callback: (scope async): callback to call when the request is satisfied
952  * @user_data: (closure): the data to pass to callback function
953  *
954  * Request an asynchronous skip of @count bytes from the stream.
955  * When the operation is finished @callback will be called.
956  * You can then call g_input_stream_skip_finish() to get the result
957  * of the operation.
958  *
959  * During an async request no other sync and async calls are allowed,
960  * and will result in %G_IO_ERROR_PENDING errors.
961  *
962  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
963  *
964  * On success, the number of bytes skipped will be passed to the callback.
965  * It is not an error if this is not the same as the requested size, as it
966  * can happen e.g. near the end of a file, but generally we try to skip
967  * as many bytes as requested. Zero is returned on end of file
968  * (or if @count is zero), but never otherwise.
969  *
970  * Any outstanding i/o request with higher priority (lower numerical value)
971  * will be executed before an outstanding request with lower priority.
972  * Default priority is %G_PRIORITY_DEFAULT.
973  *
974  * The asynchronous methods have a default fallback that uses threads to
975  * implement asynchronicity, so they are optional for inheriting classes.
976  * However, if you override one, you must override all.
977  **/
978 void
979 g_input_stream_skip_async (GInputStream        *stream,
980                            gsize                count,
981                            int                  io_priority,
982                            GCancellable        *cancellable,
983                            GAsyncReadyCallback  callback,
984                            gpointer             user_data)
985 {
986   GInputStreamClass *class;
987   GError *error = NULL;
988
989   g_return_if_fail (G_IS_INPUT_STREAM (stream));
990
991   if (count == 0)
992     {
993       GTask *task;
994
995       task = g_task_new (stream, cancellable, callback, user_data);
996       g_task_set_source_tag (task, g_input_stream_skip_async);
997       g_task_return_int (task, 0);
998       g_object_unref (task);
999       return;
1000     }
1001   
1002   if (((gssize) count) < 0)
1003     {
1004       g_task_report_new_error (stream, callback, user_data,
1005                                g_input_stream_skip_async,
1006                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1007                                _("Too large count value passed to %s"),
1008                                G_STRFUNC);
1009       return;
1010     }
1011
1012   if (!g_input_stream_set_pending (stream, &error))
1013     {
1014       g_task_report_error (stream, callback, user_data,
1015                            g_input_stream_skip_async,
1016                            error);
1017       return;
1018     }
1019
1020   class = G_INPUT_STREAM_GET_CLASS (stream);
1021   stream->priv->outstanding_callback = callback;
1022   g_object_ref (stream);
1023   class->skip_async (stream, count, io_priority, cancellable,
1024                      async_ready_callback_wrapper, user_data);
1025 }
1026
1027 /**
1028  * g_input_stream_skip_finish:
1029  * @stream: a #GInputStream.
1030  * @result: a #GAsyncResult.
1031  * @error: a #GError location to store the error occurring, or %NULL to 
1032  * ignore.
1033  * 
1034  * Finishes a stream skip operation.
1035  * 
1036  * Returns: the size of the bytes skipped, or %-1 on error.
1037  **/
1038 gssize
1039 g_input_stream_skip_finish (GInputStream  *stream,
1040                             GAsyncResult  *result,
1041                             GError       **error)
1042 {
1043   GInputStreamClass *class;
1044
1045   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
1046   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1047
1048   if (g_async_result_legacy_propagate_error (result, error))
1049     return -1;
1050   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
1051     return g_task_propagate_int (G_TASK (result), error);
1052
1053   class = G_INPUT_STREAM_GET_CLASS (stream);
1054   return class->skip_finish (stream, result, error);
1055 }
1056
1057 /**
1058  * g_input_stream_close_async:
1059  * @stream: A #GInputStream.
1060  * @io_priority: the [I/O priority][io-priority] of the request
1061  * @cancellable: (allow-none): optional cancellable object
1062  * @callback: (scope async): callback to call when the request is satisfied
1063  * @user_data: (closure): the data to pass to callback function
1064  *
1065  * Requests an asynchronous closes of the stream, releasing resources related to it.
1066  * When the operation is finished @callback will be called. 
1067  * You can then call g_input_stream_close_finish() to get the result of the 
1068  * operation.
1069  *
1070  * For behaviour details see g_input_stream_close().
1071  *
1072  * The asyncronous methods have a default fallback that uses threads to implement
1073  * asynchronicity, so they are optional for inheriting classes. However, if you
1074  * override one you must override all.
1075  **/
1076 void
1077 g_input_stream_close_async (GInputStream        *stream,
1078                             int                  io_priority,
1079                             GCancellable        *cancellable,
1080                             GAsyncReadyCallback  callback,
1081                             gpointer             user_data)
1082 {
1083   GInputStreamClass *class;
1084   GError *error = NULL;
1085
1086   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1087
1088   if (stream->priv->closed)
1089     {
1090       GTask *task;
1091
1092       task = g_task_new (stream, cancellable, callback, user_data);
1093       g_task_set_source_tag (task, g_input_stream_close_async);
1094       g_task_return_boolean (task, TRUE);
1095       g_object_unref (task);
1096       return;
1097     }
1098
1099   if (!g_input_stream_set_pending (stream, &error))
1100     {
1101       g_task_report_error (stream, callback, user_data,
1102                            g_input_stream_close_async,
1103                            error);
1104       return;
1105     }
1106   
1107   class = G_INPUT_STREAM_GET_CLASS (stream);
1108   stream->priv->outstanding_callback = callback;
1109   g_object_ref (stream);
1110   class->close_async (stream, io_priority, cancellable,
1111                       async_ready_close_callback_wrapper, user_data);
1112 }
1113
1114 /**
1115  * g_input_stream_close_finish:
1116  * @stream: a #GInputStream.
1117  * @result: a #GAsyncResult.
1118  * @error: a #GError location to store the error occurring, or %NULL to 
1119  * ignore.
1120  * 
1121  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
1122  * 
1123  * Returns: %TRUE if the stream was closed successfully.
1124  **/
1125 gboolean
1126 g_input_stream_close_finish (GInputStream  *stream,
1127                              GAsyncResult  *result,
1128                              GError       **error)
1129 {
1130   GInputStreamClass *class;
1131
1132   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1133   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1134
1135   if (g_async_result_legacy_propagate_error (result, error))
1136     return FALSE;
1137   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
1138     return g_task_propagate_boolean (G_TASK (result), error);
1139
1140   class = G_INPUT_STREAM_GET_CLASS (stream);
1141   return class->close_finish (stream, result, error);
1142 }
1143
1144 /**
1145  * g_input_stream_is_closed:
1146  * @stream: input stream.
1147  * 
1148  * Checks if an input stream is closed.
1149  * 
1150  * Returns: %TRUE if the stream is closed.
1151  **/
1152 gboolean
1153 g_input_stream_is_closed (GInputStream *stream)
1154 {
1155   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1156   
1157   return stream->priv->closed;
1158 }
1159  
1160 /**
1161  * g_input_stream_has_pending:
1162  * @stream: input stream.
1163  * 
1164  * Checks if an input stream has pending actions.
1165  * 
1166  * Returns: %TRUE if @stream has pending actions.
1167  **/  
1168 gboolean
1169 g_input_stream_has_pending (GInputStream *stream)
1170 {
1171   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1172   
1173   return stream->priv->pending;
1174 }
1175
1176 /**
1177  * g_input_stream_set_pending:
1178  * @stream: input stream
1179  * @error: a #GError location to store the error occurring, or %NULL to 
1180  * ignore.
1181  * 
1182  * Sets @stream to have actions pending. If the pending flag is
1183  * already set or @stream is closed, it will return %FALSE and set
1184  * @error.
1185  *
1186  * Returns: %TRUE if pending was previously unset and is now set.
1187  **/
1188 gboolean
1189 g_input_stream_set_pending (GInputStream *stream, GError **error)
1190 {
1191   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1192   
1193   if (stream->priv->closed)
1194     {
1195       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1196                            _("Stream is already closed"));
1197       return FALSE;
1198     }
1199   
1200   if (stream->priv->pending)
1201     {
1202       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1203                 /* Translators: This is an error you get if there is already an
1204                  * operation running against this stream when you try to start
1205                  * one */
1206                  _("Stream has outstanding operation"));
1207       return FALSE;
1208     }
1209   
1210   stream->priv->pending = TRUE;
1211   return TRUE;
1212 }
1213
1214 /**
1215  * g_input_stream_clear_pending:
1216  * @stream: input stream
1217  * 
1218  * Clears the pending flag on @stream.
1219  **/
1220 void
1221 g_input_stream_clear_pending (GInputStream *stream)
1222 {
1223   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1224   
1225   stream->priv->pending = FALSE;
1226 }
1227
1228 /**
1229  * g_input_stream_async_read_is_via_threads:
1230  * @stream: input stream
1231  *
1232  * Checks if an input stream's read_async function uses threads.
1233  *
1234  * Returns: %TRUE if @stream's read_async function uses threads.
1235  **/
1236 gboolean
1237 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1238 {
1239   GInputStreamClass *class;
1240
1241   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1242
1243   class = G_INPUT_STREAM_GET_CLASS (stream);
1244
1245   return (class->read_async == g_input_stream_real_read_async &&
1246       !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1247         g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1248 }
1249
1250 /********************************************
1251  *   Default implementation of async ops    *
1252  ********************************************/
1253
1254 typedef struct {
1255   void   *buffer;
1256   gsize   count;
1257 } ReadData;
1258
1259 static void
1260 free_read_data (ReadData *op)
1261 {
1262   g_slice_free (ReadData, op);
1263 }
1264
1265 static void
1266 read_async_thread (GTask        *task,
1267                    gpointer      source_object,
1268                    gpointer      task_data,
1269                    GCancellable *cancellable)
1270 {
1271   GInputStream *stream = source_object;
1272   ReadData *op = task_data;
1273   GInputStreamClass *class;
1274   GError *error = NULL;
1275   gssize nread;
1276  
1277   class = G_INPUT_STREAM_GET_CLASS (stream);
1278
1279   nread = class->read_fn (stream,
1280                           op->buffer, op->count,
1281                           g_task_get_cancellable (task),
1282                           &error);
1283   if (nread == -1)
1284     g_task_return_error (task, error);
1285   else
1286     g_task_return_int (task, nread);
1287 }
1288
1289 static void read_async_pollable (GPollableInputStream *stream,
1290                                  GTask                *task);
1291
1292 static gboolean
1293 read_async_pollable_ready (GPollableInputStream *stream,
1294                            gpointer              user_data)
1295 {
1296   GTask *task = user_data;
1297
1298   read_async_pollable (stream, task);
1299   return FALSE;
1300 }
1301
1302 static void
1303 read_async_pollable (GPollableInputStream *stream,
1304                      GTask                *task)
1305 {
1306   ReadData *op = g_task_get_task_data (task);
1307   GError *error = NULL;
1308   gssize nread;
1309
1310   if (g_task_return_error_if_cancelled (task))
1311     return;
1312
1313   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1314     read_nonblocking (stream, op->buffer, op->count, &error);
1315
1316   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1317     {
1318       GSource *source;
1319
1320       g_error_free (error);
1321
1322       source = g_pollable_input_stream_create_source (stream,
1323                                                       g_task_get_cancellable (task));
1324       g_task_attach_source (task, source,
1325                             (GSourceFunc) read_async_pollable_ready);
1326       g_source_unref (source);
1327       return;
1328     }
1329
1330   if (nread == -1)
1331     g_task_return_error (task, error);
1332   else
1333     g_task_return_int (task, nread);
1334   /* g_input_stream_real_read_async() unrefs task */
1335 }
1336
1337
1338 static void
1339 g_input_stream_real_read_async (GInputStream        *stream,
1340                                 void                *buffer,
1341                                 gsize                count,
1342                                 int                  io_priority,
1343                                 GCancellable        *cancellable,
1344                                 GAsyncReadyCallback  callback,
1345                                 gpointer             user_data)
1346 {
1347   GTask *task;
1348   ReadData *op;
1349   
1350   op = g_slice_new0 (ReadData);
1351   task = g_task_new (stream, cancellable, callback, user_data);
1352   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1353   g_task_set_priority (task, io_priority);
1354   op->buffer = buffer;
1355   op->count = count;
1356
1357   if (!g_input_stream_async_read_is_via_threads (stream))
1358     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1359   else
1360     g_task_run_in_thread (task, read_async_thread);
1361   g_object_unref (task);
1362 }
1363
1364 static gssize
1365 g_input_stream_real_read_finish (GInputStream  *stream,
1366                                  GAsyncResult  *result,
1367                                  GError       **error)
1368 {
1369   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1370
1371   return g_task_propagate_int (G_TASK (result), error);
1372 }
1373
1374
1375 static void
1376 skip_async_thread (GTask        *task,
1377                    gpointer      source_object,
1378                    gpointer      task_data,
1379                    GCancellable *cancellable)
1380 {
1381   GInputStream *stream = source_object;
1382   gsize count = GPOINTER_TO_SIZE (task_data);
1383   GInputStreamClass *class;
1384   GError *error = NULL;
1385   gssize ret;
1386
1387   class = G_INPUT_STREAM_GET_CLASS (stream);
1388   ret = class->skip (stream, count,
1389                      g_task_get_cancellable (task),
1390                      &error);
1391   if (ret == -1)
1392     g_task_return_error (task, error);
1393   else
1394     g_task_return_int (task, ret);
1395 }
1396
1397 typedef struct {
1398   char buffer[8192];
1399   gsize count;
1400   gsize count_skipped;
1401 } SkipFallbackAsyncData;
1402
1403 static void
1404 skip_callback_wrapper (GObject      *source_object,
1405                        GAsyncResult *res,
1406                        gpointer      user_data)
1407 {
1408   GInputStreamClass *class;
1409   GTask *task = user_data;
1410   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1411   GError *error = NULL;
1412   gssize ret;
1413
1414   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1415
1416   if (ret > 0)
1417     {
1418       data->count -= ret;
1419       data->count_skipped += ret;
1420
1421       if (data->count > 0)
1422         {
1423           class = G_INPUT_STREAM_GET_CLASS (source_object);
1424           class->read_async (G_INPUT_STREAM (source_object),
1425                              data->buffer, MIN (8192, data->count),
1426                              g_task_get_priority (task),
1427                              g_task_get_cancellable (task),
1428                              skip_callback_wrapper, task);
1429           return;
1430         }
1431     }
1432
1433   if (ret == -1 &&
1434       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1435       data->count_skipped)
1436     {
1437       /* No error, return partial read */
1438       g_clear_error (&error);
1439     }
1440
1441   if (error)
1442     g_task_return_error (task, error);
1443   else
1444     g_task_return_int (task, data->count_skipped);
1445   g_object_unref (task);
1446  }
1447
1448 static void
1449 g_input_stream_real_skip_async (GInputStream        *stream,
1450                                 gsize                count,
1451                                 int                  io_priority,
1452                                 GCancellable        *cancellable,
1453                                 GAsyncReadyCallback  callback,
1454                                 gpointer             user_data)
1455 {
1456   GInputStreamClass *class;
1457   SkipFallbackAsyncData *data;
1458   GTask *task;
1459
1460   class = G_INPUT_STREAM_GET_CLASS (stream);
1461
1462   task = g_task_new (stream, cancellable, callback, user_data);
1463   g_task_set_priority (task, io_priority);
1464
1465   if (g_input_stream_async_read_is_via_threads (stream))
1466     {
1467       /* Read is thread-using async fallback.
1468        * Make skip use threads too, so that we can use a possible sync skip
1469        * implementation. */
1470       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1471
1472       g_task_run_in_thread (task, skip_async_thread);
1473       g_object_unref (task);
1474     }
1475   else
1476     {
1477       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1478       
1479       /* There is a custom async read function, lets use that. */
1480       data = g_new (SkipFallbackAsyncData, 1);
1481       data->count = count;
1482       data->count_skipped = 0;
1483       g_task_set_task_data (task, data, g_free);
1484       g_task_set_check_cancellable (task, FALSE);
1485       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1486                          skip_callback_wrapper, task);
1487     }
1488
1489 }
1490
1491 static gssize
1492 g_input_stream_real_skip_finish (GInputStream  *stream,
1493                                  GAsyncResult  *result,
1494                                  GError       **error)
1495 {
1496   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1497
1498   return g_task_propagate_int (G_TASK (result), error);
1499 }
1500
1501 static void
1502 close_async_thread (GTask        *task,
1503                     gpointer      source_object,
1504                     gpointer      task_data,
1505                     GCancellable *cancellable)
1506 {
1507   GInputStream *stream = source_object;
1508   GInputStreamClass *class;
1509   GError *error = NULL;
1510   gboolean result;
1511
1512   class = G_INPUT_STREAM_GET_CLASS (stream);
1513   if (class->close_fn)
1514     {
1515       result = class->close_fn (stream,
1516                                 g_task_get_cancellable (task),
1517                                 &error);
1518       if (!result)
1519         {
1520           g_task_return_error (task, error);
1521           return;
1522         }
1523     }
1524
1525   g_task_return_boolean (task, TRUE);
1526 }
1527
1528 static void
1529 g_input_stream_real_close_async (GInputStream        *stream,
1530                                  int                  io_priority,
1531                                  GCancellable        *cancellable,
1532                                  GAsyncReadyCallback  callback,
1533                                  gpointer             user_data)
1534 {
1535   GTask *task;
1536
1537   task = g_task_new (stream, cancellable, callback, user_data);
1538   g_task_set_check_cancellable (task, FALSE);
1539   g_task_set_priority (task, io_priority);
1540   
1541   g_task_run_in_thread (task, close_async_thread);
1542   g_object_unref (task);
1543 }
1544
1545 static gboolean
1546 g_input_stream_real_close_finish (GInputStream  *stream,
1547                                   GAsyncResult  *result,
1548                                   GError       **error)
1549 {
1550   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1551
1552   return g_task_propagate_boolean (G_TASK (result), error);
1553 }