Imported Upstream version 2.66.6
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Alexander Larsson <alexl@redhat.com>
19  */
20
21 #include "config.h"
22 #include <glib.h>
23 #include "glibintl.h"
24
25 #include "ginputstream.h"
26 #include "gioprivate.h"
27 #include "gseekable.h"
28 #include "gcancellable.h"
29 #include "gasyncresult.h"
30 #include "gioerror.h"
31 #include "gpollableinputstream.h"
32
33 /**
34  * SECTION:ginputstream
35  * @short_description: Base class for implementing streaming input
36  * @include: gio/gio.h
37  *
38  * #GInputStream has functions to read from a stream (g_input_stream_read()),
39  * to close a stream (g_input_stream_close()) and to skip some content
40  * (g_input_stream_skip()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice().
44  *
45  * See the documentation for #GIOStream for details of thread safety of
46  * streaming APIs.
47  *
48  * All of these functions have async variants too.
49  **/
50
51 struct _GInputStreamPrivate {
52   guint closed : 1;
53   guint pending : 1;
54   GAsyncReadyCallback outstanding_callback;
55 };
56
57 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
58
59 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
60                                                   gsize                 count,
61                                                   GCancellable         *cancellable,
62                                                   GError              **error);
63 static void     g_input_stream_real_read_async   (GInputStream         *stream,
64                                                   void                 *buffer,
65                                                   gsize                 count,
66                                                   int                   io_priority,
67                                                   GCancellable         *cancellable,
68                                                   GAsyncReadyCallback   callback,
69                                                   gpointer              user_data);
70 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
71                                                   GAsyncResult         *result,
72                                                   GError              **error);
73 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
74                                                   gsize                 count,
75                                                   int                   io_priority,
76                                                   GCancellable         *cancellable,
77                                                   GAsyncReadyCallback   callback,
78                                                   gpointer              data);
79 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
80                                                   GAsyncResult         *result,
81                                                   GError              **error);
82 static void     g_input_stream_real_close_async  (GInputStream         *stream,
83                                                   int                   io_priority,
84                                                   GCancellable         *cancellable,
85                                                   GAsyncReadyCallback   callback,
86                                                   gpointer              data);
87 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
88                                                   GAsyncResult         *result,
89                                                   GError              **error);
90
91 static void
92 g_input_stream_dispose (GObject *object)
93 {
94   GInputStream *stream;
95
96   stream = G_INPUT_STREAM (object);
97   
98   if (!stream->priv->closed)
99     g_input_stream_close (stream, NULL, NULL);
100
101   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
102 }
103
104
105 static void
106 g_input_stream_class_init (GInputStreamClass *klass)
107 {
108   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
109   
110   gobject_class->dispose = g_input_stream_dispose;
111   
112   klass->skip = g_input_stream_real_skip;
113   klass->read_async = g_input_stream_real_read_async;
114   klass->read_finish = g_input_stream_real_read_finish;
115   klass->skip_async = g_input_stream_real_skip_async;
116   klass->skip_finish = g_input_stream_real_skip_finish;
117   klass->close_async = g_input_stream_real_close_async;
118   klass->close_finish = g_input_stream_real_close_finish;
119 }
120
121 static void
122 g_input_stream_init (GInputStream *stream)
123 {
124   stream->priv = g_input_stream_get_instance_private (stream);
125 }
126
127 /**
128  * g_input_stream_read:
129  * @stream: a #GInputStream.
130  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
131  *     a buffer to read data into (which should be at least count bytes long).
132  * @count: the number of bytes that will be read from the stream
133  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
134  * @error: location to store the error occurring, or %NULL to ignore
135  *
136  * Tries to read @count bytes from the stream into the buffer starting at
137  * @buffer. Will block during this read.
138  * 
139  * If count is zero returns zero and does nothing. A value of @count
140  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
141  *
142  * On success, the number of bytes read into the buffer is returned.
143  * It is not an error if this is not the same as the requested size, as it
144  * can happen e.g. near the end of a file. Zero is returned on end of file
145  * (or if @count is zero),  but never otherwise.
146  *
147  * The returned @buffer is not a nul-terminated string, it can contain nul bytes
148  * at any position, and this function doesn't nul-terminate the @buffer.
149  *
150  * If @cancellable is not %NULL, then the operation can be cancelled by
151  * triggering the cancellable object from another thread. If the operation
152  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
153  * operation was partially finished when the operation was cancelled the
154  * partial result will be returned, without an error.
155  *
156  * On error -1 is returned and @error is set accordingly.
157  * 
158  * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
159  **/
160 gssize
161 g_input_stream_read  (GInputStream  *stream,
162                       void          *buffer,
163                       gsize          count,
164                       GCancellable  *cancellable,
165                       GError       **error)
166 {
167   GInputStreamClass *class;
168   gssize res;
169
170   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
171   g_return_val_if_fail (buffer != NULL, 0);
172
173   if (count == 0)
174     return 0;
175   
176   if (((gssize) count) < 0)
177     {
178       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
179                    _("Too large count value passed to %s"), G_STRFUNC);
180       return -1;
181     }
182
183   class = G_INPUT_STREAM_GET_CLASS (stream);
184
185   if (class->read_fn == NULL) 
186     {
187       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
188                            _("Input stream doesn’t implement read"));
189       return -1;
190     }
191
192   if (!g_input_stream_set_pending (stream, error))
193     return -1;
194
195   if (cancellable)
196     g_cancellable_push_current (cancellable);
197   
198   res = class->read_fn (stream, buffer, count, cancellable, error);
199
200   if (cancellable)
201     g_cancellable_pop_current (cancellable);
202   
203   g_input_stream_clear_pending (stream);
204
205   return res;
206 }
207
208 /**
209  * g_input_stream_read_all:
210  * @stream: a #GInputStream.
211  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
212  *     a buffer to read data into (which should be at least count bytes long).
213  * @count: the number of bytes that will be read from the stream
214  * @bytes_read: (out): location to store the number of bytes that was read from the stream
215  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
216  * @error: location to store the error occurring, or %NULL to ignore
217  *
218  * Tries to read @count bytes from the stream into the buffer starting at
219  * @buffer. Will block during this read.
220  *
221  * This function is similar to g_input_stream_read(), except it tries to
222  * read as many bytes as requested, only stopping on an error or end of stream.
223  *
224  * On a successful read of @count bytes, or if we reached the end of the
225  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
226  * read into @buffer.
227  * 
228  * If there is an error during the operation %FALSE is returned and @error
229  * is set to indicate the error status.
230  *
231  * As a special exception to the normal conventions for functions that
232  * use #GError, if this function returns %FALSE (and sets @error) then
233  * @bytes_read will be set to the number of bytes that were successfully
234  * read before the error was encountered.  This functionality is only
235  * available from C.  If you need it from another language then you must
236  * write your own loop around g_input_stream_read().
237  *
238  * Returns: %TRUE on success, %FALSE if there was an error
239  **/
240 gboolean
241 g_input_stream_read_all (GInputStream  *stream,
242                          void          *buffer,
243                          gsize          count,
244                          gsize         *bytes_read,
245                          GCancellable  *cancellable,
246                          GError       **error)
247 {
248   gsize _bytes_read;
249   gssize res;
250
251   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
252   g_return_val_if_fail (buffer != NULL, FALSE);
253
254   _bytes_read = 0;
255   while (_bytes_read < count)
256     {
257       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
258                                  cancellable, error);
259       if (res == -1)
260         {
261           if (bytes_read)
262             *bytes_read = _bytes_read;
263           return FALSE;
264         }
265       
266       if (res == 0)
267         break;
268
269       _bytes_read += res;
270     }
271
272   if (bytes_read)
273     *bytes_read = _bytes_read;
274   return TRUE;
275 }
276
277 /**
278  * g_input_stream_read_bytes:
279  * @stream: a #GInputStream.
280  * @count: maximum number of bytes that will be read from the stream. Common
281  * values include 4096 and 8192.
282  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
283  * @error: location to store the error occurring, or %NULL to ignore
284  *
285  * Like g_input_stream_read(), this tries to read @count bytes from
286  * the stream in a blocking fashion. However, rather than reading into
287  * a user-supplied buffer, this will create a new #GBytes containing
288  * the data that was read. This may be easier to use from language
289  * bindings.
290  *
291  * If count is zero, returns a zero-length #GBytes and does nothing. A
292  * value of @count larger than %G_MAXSSIZE will cause a
293  * %G_IO_ERROR_INVALID_ARGUMENT error.
294  *
295  * On success, a new #GBytes is returned. It is not an error if the
296  * size of this object is not the same as the requested size, as it
297  * can happen e.g. near the end of a file. A zero-length #GBytes is
298  * returned on end of file (or if @count is zero), but never
299  * otherwise.
300  *
301  * If @cancellable is not %NULL, then the operation can be cancelled by
302  * triggering the cancellable object from another thread. If the operation
303  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
304  * operation was partially finished when the operation was cancelled the
305  * partial result will be returned, without an error.
306  *
307  * On error %NULL is returned and @error is set accordingly.
308  *
309  * Returns: (transfer full): a new #GBytes, or %NULL on error
310  *
311  * Since: 2.34
312  **/
313 GBytes *
314 g_input_stream_read_bytes (GInputStream  *stream,
315                            gsize          count,
316                            GCancellable  *cancellable,
317                            GError       **error)
318 {
319   guchar *buf;
320   gssize nread;
321
322   buf = g_malloc (count);
323   nread = g_input_stream_read (stream, buf, count, cancellable, error);
324   if (nread == -1)
325     {
326       g_free (buf);
327       return NULL;
328     }
329   else if (nread == 0)
330     {
331       g_free (buf);
332       return g_bytes_new_static ("", 0);
333     }
334   else
335     return g_bytes_new_take (buf, nread);
336 }
337
338 /**
339  * g_input_stream_skip:
340  * @stream: a #GInputStream.
341  * @count: the number of bytes that will be skipped from the stream
342  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore. 
343  * @error: location to store the error occurring, or %NULL to ignore
344  *
345  * Tries to skip @count bytes from the stream. Will block during the operation.
346  *
347  * This is identical to g_input_stream_read(), from a behaviour standpoint,
348  * but the bytes that are skipped are not returned to the user. Some
349  * streams have an implementation that is more efficient than reading the data.
350  *
351  * This function is optional for inherited classes, as the default implementation
352  * emulates it using read.
353  *
354  * If @cancellable is not %NULL, then the operation can be cancelled by
355  * triggering the cancellable object from another thread. If the operation
356  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
357  * operation was partially finished when the operation was cancelled the
358  * partial result will be returned, without an error.
359  *
360  * Returns: Number of bytes skipped, or -1 on error
361  **/
362 gssize
363 g_input_stream_skip (GInputStream  *stream,
364                      gsize          count,
365                      GCancellable  *cancellable,
366                      GError       **error)
367 {
368   GInputStreamClass *class;
369   gssize res;
370
371   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
372
373   if (count == 0)
374     return 0;
375
376   if (((gssize) count) < 0)
377     {
378       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
379                    _("Too large count value passed to %s"), G_STRFUNC);
380       return -1;
381     }
382   
383   class = G_INPUT_STREAM_GET_CLASS (stream);
384
385   if (!g_input_stream_set_pending (stream, error))
386     return -1;
387
388   if (cancellable)
389     g_cancellable_push_current (cancellable);
390   
391   res = class->skip (stream, count, cancellable, error);
392
393   if (cancellable)
394     g_cancellable_pop_current (cancellable);
395   
396   g_input_stream_clear_pending (stream);
397
398   return res;
399 }
400
401 static gssize
402 g_input_stream_real_skip (GInputStream  *stream,
403                           gsize          count,
404                           GCancellable  *cancellable,
405                           GError       **error)
406 {
407   GInputStreamClass *class;
408   gssize ret, read_bytes;
409   char buffer[8192];
410   GError *my_error;
411
412   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
413     {
414       if (g_seekable_seek (G_SEEKABLE (stream),
415                            count,
416                            G_SEEK_CUR,
417                            cancellable,
418                            NULL))
419         return count;
420     }
421
422   /* If not seekable, or seek failed, fall back to reading data: */
423
424   class = G_INPUT_STREAM_GET_CLASS (stream);
425
426   read_bytes = 0;
427   while (1)
428     {
429       my_error = NULL;
430
431       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
432                             cancellable, &my_error);
433       if (ret == -1)
434         {
435           if (read_bytes > 0 &&
436               my_error->domain == G_IO_ERROR &&
437               my_error->code == G_IO_ERROR_CANCELLED)
438             {
439               g_error_free (my_error);
440               return read_bytes;
441             }
442
443           g_propagate_error (error, my_error);
444           return -1;
445         }
446
447       count -= ret;
448       read_bytes += ret;
449
450       if (ret == 0 || count == 0)
451         return read_bytes;
452     }
453 }
454
455 /**
456  * g_input_stream_close:
457  * @stream: A #GInputStream.
458  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
459  * @error: location to store the error occurring, or %NULL to ignore
460  *
461  * Closes the stream, releasing resources related to it.
462  *
463  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
464  * Closing a stream multiple times will not return an error.
465  *
466  * Streams will be automatically closed when the last reference
467  * is dropped, but you might want to call this function to make sure 
468  * resources are released as early as possible.
469  *
470  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
471  * open after the stream is closed. See the documentation for the individual
472  * stream for details.
473  *
474  * On failure the first error that happened will be reported, but the close
475  * operation will finish as much as possible. A stream that failed to
476  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
477  * is important to check and report the error to the user.
478  *
479  * If @cancellable is not %NULL, then the operation can be cancelled by
480  * triggering the cancellable object from another thread. If the operation
481  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
482  * Cancelling a close will still leave the stream closed, but some streams
483  * can use a faster close that doesn't block to e.g. check errors. 
484  *
485  * Returns: %TRUE on success, %FALSE on failure
486  **/
487 gboolean
488 g_input_stream_close (GInputStream  *stream,
489                       GCancellable  *cancellable,
490                       GError       **error)
491 {
492   GInputStreamClass *class;
493   gboolean res;
494
495   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
496
497   class = G_INPUT_STREAM_GET_CLASS (stream);
498
499   if (stream->priv->closed)
500     return TRUE;
501
502   res = TRUE;
503
504   if (!g_input_stream_set_pending (stream, error))
505     return FALSE;
506
507   if (cancellable)
508     g_cancellable_push_current (cancellable);
509
510   if (class->close_fn)
511     res = class->close_fn (stream, cancellable, error);
512
513   if (cancellable)
514     g_cancellable_pop_current (cancellable);
515
516   g_input_stream_clear_pending (stream);
517   
518   stream->priv->closed = TRUE;
519   
520   return res;
521 }
522
523 static void
524 async_ready_callback_wrapper (GObject      *source_object,
525                               GAsyncResult *res,
526                               gpointer      user_data)
527 {
528   GInputStream *stream = G_INPUT_STREAM (source_object);
529
530   g_input_stream_clear_pending (stream);
531   if (stream->priv->outstanding_callback)
532     (*stream->priv->outstanding_callback) (source_object, res, user_data);
533   g_object_unref (stream);
534 }
535
536 static void
537 async_ready_close_callback_wrapper (GObject      *source_object,
538                                     GAsyncResult *res,
539                                     gpointer      user_data)
540 {
541   GInputStream *stream = G_INPUT_STREAM (source_object);
542
543   g_input_stream_clear_pending (stream);
544   stream->priv->closed = TRUE;
545   if (stream->priv->outstanding_callback)
546     (*stream->priv->outstanding_callback) (source_object, res, user_data);
547   g_object_unref (stream);
548 }
549
550 /**
551  * g_input_stream_read_async:
552  * @stream: A #GInputStream.
553  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
554  *     a buffer to read data into (which should be at least count bytes long).
555  * @count: the number of bytes that will be read from the stream
556  * @io_priority: the [I/O priority][io-priority]
557  * of the request. 
558  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
559  * @callback: (scope async): callback to call when the request is satisfied
560  * @user_data: (closure): the data to pass to callback function
561  *
562  * Request an asynchronous read of @count bytes from the stream into the buffer
563  * starting at @buffer. When the operation is finished @callback will be called. 
564  * You can then call g_input_stream_read_finish() to get the result of the 
565  * operation.
566  *
567  * During an async request no other sync and async calls are allowed on @stream, and will
568  * result in %G_IO_ERROR_PENDING errors. 
569  *
570  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
571  *
572  * On success, the number of bytes read into the buffer will be passed to the
573  * callback. It is not an error if this is not the same as the requested size, as it
574  * can happen e.g. near the end of a file, but generally we try to read
575  * as many bytes as requested. Zero is returned on end of file
576  * (or if @count is zero),  but never otherwise.
577  *
578  * Any outstanding i/o request with higher priority (lower numerical value) will
579  * be executed before an outstanding request with lower priority. Default
580  * priority is %G_PRIORITY_DEFAULT.
581  *
582  * The asynchronous methods have a default fallback that uses threads to implement
583  * asynchronicity, so they are optional for inheriting classes. However, if you
584  * override one you must override all.
585  **/
586 void
587 g_input_stream_read_async (GInputStream        *stream,
588                            void                *buffer,
589                            gsize                count,
590                            int                  io_priority,
591                            GCancellable        *cancellable,
592                            GAsyncReadyCallback  callback,
593                            gpointer             user_data)
594 {
595   GInputStreamClass *class;
596   GError *error = NULL;
597
598   g_return_if_fail (G_IS_INPUT_STREAM (stream));
599   g_return_if_fail (buffer != NULL);
600
601   if (count == 0)
602     {
603       GTask *task;
604
605       task = g_task_new (stream, cancellable, callback, user_data);
606       g_task_set_source_tag (task, g_input_stream_read_async);
607       g_task_return_int (task, 0);
608       g_object_unref (task);
609       return;
610     }
611   
612   if (((gssize) count) < 0)
613     {
614       g_task_report_new_error (stream, callback, user_data,
615                                g_input_stream_read_async,
616                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
617                                _("Too large count value passed to %s"),
618                                G_STRFUNC);
619       return;
620     }
621
622   if (!g_input_stream_set_pending (stream, &error))
623     {
624       g_task_report_error (stream, callback, user_data,
625                            g_input_stream_read_async,
626                            error);
627       return;
628     }
629
630   class = G_INPUT_STREAM_GET_CLASS (stream);
631   stream->priv->outstanding_callback = callback;
632   g_object_ref (stream);
633   class->read_async (stream, buffer, count, io_priority, cancellable,
634                      async_ready_callback_wrapper, user_data);
635 }
636
637 /**
638  * g_input_stream_read_finish:
639  * @stream: a #GInputStream.
640  * @result: a #GAsyncResult.
641  * @error: a #GError location to store the error occurring, or %NULL to 
642  * ignore.
643  * 
644  * Finishes an asynchronous stream read operation. 
645  * 
646  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
647  **/
648 gssize
649 g_input_stream_read_finish (GInputStream  *stream,
650                             GAsyncResult  *result,
651                             GError       **error)
652 {
653   GInputStreamClass *class;
654   
655   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
656   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
657
658   if (g_async_result_legacy_propagate_error (result, error))
659     return -1;
660   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
661     return g_task_propagate_int (G_TASK (result), error);
662
663   class = G_INPUT_STREAM_GET_CLASS (stream);
664   return class->read_finish (stream, result, error);
665 }
666
667 typedef struct
668 {
669   gchar *buffer;
670   gsize to_read;
671   gsize bytes_read;
672 } AsyncReadAll;
673
674 static void
675 free_async_read_all (gpointer data)
676 {
677   g_slice_free (AsyncReadAll, data);
678 }
679
680 static void
681 read_all_callback (GObject      *stream,
682                    GAsyncResult *result,
683                    gpointer      user_data)
684 {
685   GTask *task = user_data;
686   AsyncReadAll *data = g_task_get_task_data (task);
687   gboolean got_eof = FALSE;
688
689   if (result)
690     {
691       GError *error = NULL;
692       gssize nread;
693
694       nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
695
696       if (nread == -1)
697         {
698           g_task_return_error (task, error);
699           g_object_unref (task);
700           return;
701         }
702
703       g_assert_cmpint (nread, <=, data->to_read);
704       data->to_read -= nread;
705       data->bytes_read += nread;
706       got_eof = (nread == 0);
707     }
708
709   if (got_eof || data->to_read == 0)
710     {
711       g_task_return_boolean (task, TRUE);
712       g_object_unref (task);
713     }
714
715   else
716     g_input_stream_read_async (G_INPUT_STREAM (stream),
717                                data->buffer + data->bytes_read,
718                                data->to_read,
719                                g_task_get_priority (task),
720                                g_task_get_cancellable (task),
721                                read_all_callback, task);
722 }
723
724
725 static void
726 read_all_async_thread (GTask        *task,
727                        gpointer      source_object,
728                        gpointer      task_data,
729                        GCancellable *cancellable)
730 {
731   GInputStream *stream = source_object;
732   AsyncReadAll *data = task_data;
733   GError *error = NULL;
734
735   if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
736                                g_task_get_cancellable (task), &error))
737     g_task_return_boolean (task, TRUE);
738   else
739     g_task_return_error (task, error);
740 }
741
742 /**
743  * g_input_stream_read_all_async:
744  * @stream: A #GInputStream
745  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
746  *     a buffer to read data into (which should be at least count bytes long)
747  * @count: the number of bytes that will be read from the stream
748  * @io_priority: the [I/O priority][io-priority] of the request
749  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
750  * @callback: (scope async): callback to call when the request is satisfied
751  * @user_data: (closure): the data to pass to callback function
752  *
753  * Request an asynchronous read of @count bytes from the stream into the
754  * buffer starting at @buffer.
755  *
756  * This is the asynchronous equivalent of g_input_stream_read_all().
757  *
758  * Call g_input_stream_read_all_finish() to collect the result.
759  *
760  * Any outstanding I/O request with higher priority (lower numerical
761  * value) will be executed before an outstanding request with lower
762  * priority. Default priority is %G_PRIORITY_DEFAULT.
763  *
764  * Since: 2.44
765  **/
766 void
767 g_input_stream_read_all_async (GInputStream        *stream,
768                                void                *buffer,
769                                gsize                count,
770                                int                  io_priority,
771                                GCancellable        *cancellable,
772                                GAsyncReadyCallback  callback,
773                                gpointer             user_data)
774 {
775   AsyncReadAll *data;
776   GTask *task;
777
778   g_return_if_fail (G_IS_INPUT_STREAM (stream));
779   g_return_if_fail (buffer != NULL || count == 0);
780
781   task = g_task_new (stream, cancellable, callback, user_data);
782   data = g_slice_new0 (AsyncReadAll);
783   data->buffer = buffer;
784   data->to_read = count;
785
786   g_task_set_source_tag (task, g_input_stream_read_all_async);
787   g_task_set_task_data (task, data, free_async_read_all);
788   g_task_set_priority (task, io_priority);
789
790   /* If async reads are going to be handled via the threadpool anyway
791    * then we may as well do it with a single dispatch instead of
792    * bouncing in and out.
793    */
794   if (g_input_stream_async_read_is_via_threads (stream))
795     {
796       g_task_run_in_thread (task, read_all_async_thread);
797       g_object_unref (task);
798     }
799   else
800     read_all_callback (G_OBJECT (stream), NULL, task);
801 }
802
803 /**
804  * g_input_stream_read_all_finish:
805  * @stream: a #GInputStream
806  * @result: a #GAsyncResult
807  * @bytes_read: (out): location to store the number of bytes that was read from the stream
808  * @error: a #GError location to store the error occurring, or %NULL to ignore
809  *
810  * Finishes an asynchronous stream read operation started with
811  * g_input_stream_read_all_async().
812  *
813  * As a special exception to the normal conventions for functions that
814  * use #GError, if this function returns %FALSE (and sets @error) then
815  * @bytes_read will be set to the number of bytes that were successfully
816  * read before the error was encountered.  This functionality is only
817  * available from C.  If you need it from another language then you must
818  * write your own loop around g_input_stream_read_async().
819  *
820  * Returns: %TRUE on success, %FALSE if there was an error
821  *
822  * Since: 2.44
823  **/
824 gboolean
825 g_input_stream_read_all_finish (GInputStream  *stream,
826                                 GAsyncResult  *result,
827                                 gsize         *bytes_read,
828                                 GError       **error)
829 {
830   GTask *task;
831
832   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
833   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
834
835   task = G_TASK (result);
836
837   if (bytes_read)
838     {
839       AsyncReadAll *data = g_task_get_task_data (task);
840
841       *bytes_read = data->bytes_read;
842     }
843
844   return g_task_propagate_boolean (task, error);
845 }
846
847 static void
848 read_bytes_callback (GObject      *stream,
849                      GAsyncResult *result,
850                      gpointer      user_data)
851 {
852   GTask *task = user_data;
853   guchar *buf = g_task_get_task_data (task);
854   GError *error = NULL;
855   gssize nread;
856   GBytes *bytes = NULL;
857
858   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
859                                       result, &error);
860   if (nread == -1)
861     {
862       g_free (buf);
863       g_task_return_error (task, error);
864     }
865   else if (nread == 0)
866     {
867       g_free (buf);
868       bytes = g_bytes_new_static ("", 0);
869     }
870   else
871     bytes = g_bytes_new_take (buf, nread);
872
873   if (bytes)
874     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
875
876   g_object_unref (task);
877 }
878
879 /**
880  * g_input_stream_read_bytes_async:
881  * @stream: A #GInputStream.
882  * @count: the number of bytes that will be read from the stream
883  * @io_priority: the [I/O priority][io-priority] of the request
884  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
885  * @callback: (scope async): callback to call when the request is satisfied
886  * @user_data: (closure): the data to pass to callback function
887  *
888  * Request an asynchronous read of @count bytes from the stream into a
889  * new #GBytes. When the operation is finished @callback will be
890  * called. You can then call g_input_stream_read_bytes_finish() to get the
891  * result of the operation.
892  *
893  * During an async request no other sync and async calls are allowed
894  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
895  *
896  * A value of @count larger than %G_MAXSSIZE will cause a
897  * %G_IO_ERROR_INVALID_ARGUMENT error.
898  *
899  * On success, the new #GBytes will be passed to the callback. It is
900  * not an error if this is smaller than the requested size, as it can
901  * happen e.g. near the end of a file, but generally we try to read as
902  * many bytes as requested. Zero is returned on end of file (or if
903  * @count is zero), but never otherwise.
904  *
905  * Any outstanding I/O request with higher priority (lower numerical
906  * value) will be executed before an outstanding request with lower
907  * priority. Default priority is %G_PRIORITY_DEFAULT.
908  *
909  * Since: 2.34
910  **/
911 void
912 g_input_stream_read_bytes_async (GInputStream          *stream,
913                                  gsize                  count,
914                                  int                    io_priority,
915                                  GCancellable          *cancellable,
916                                  GAsyncReadyCallback    callback,
917                                  gpointer               user_data)
918 {
919   GTask *task;
920   guchar *buf;
921
922   task = g_task_new (stream, cancellable, callback, user_data);
923   g_task_set_source_tag (task, g_input_stream_read_bytes_async);
924
925   buf = g_malloc (count);
926   g_task_set_task_data (task, buf, NULL);
927
928   g_input_stream_read_async (stream, buf, count,
929                              io_priority, cancellable,
930                              read_bytes_callback, task);
931 }
932
933 /**
934  * g_input_stream_read_bytes_finish:
935  * @stream: a #GInputStream.
936  * @result: a #GAsyncResult.
937  * @error: a #GError location to store the error occurring, or %NULL to
938  *   ignore.
939  *
940  * Finishes an asynchronous stream read-into-#GBytes operation.
941  *
942  * Returns: (transfer full): the newly-allocated #GBytes, or %NULL on error
943  *
944  * Since: 2.34
945  **/
946 GBytes *
947 g_input_stream_read_bytes_finish (GInputStream  *stream,
948                                   GAsyncResult  *result,
949                                   GError       **error)
950 {
951   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
952   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
953
954   return g_task_propagate_pointer (G_TASK (result), error);
955 }
956
957 /**
958  * g_input_stream_skip_async:
959  * @stream: A #GInputStream.
960  * @count: the number of bytes that will be skipped from the stream
961  * @io_priority: the [I/O priority][io-priority] of the request
962  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
963  * @callback: (scope async): callback to call when the request is satisfied
964  * @user_data: (closure): the data to pass to callback function
965  *
966  * Request an asynchronous skip of @count bytes from the stream.
967  * When the operation is finished @callback will be called.
968  * You can then call g_input_stream_skip_finish() to get the result
969  * of the operation.
970  *
971  * During an async request no other sync and async calls are allowed,
972  * and will result in %G_IO_ERROR_PENDING errors.
973  *
974  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
975  *
976  * On success, the number of bytes skipped will be passed to the callback.
977  * It is not an error if this is not the same as the requested size, as it
978  * can happen e.g. near the end of a file, but generally we try to skip
979  * as many bytes as requested. Zero is returned on end of file
980  * (or if @count is zero), but never otherwise.
981  *
982  * Any outstanding i/o request with higher priority (lower numerical value)
983  * will be executed before an outstanding request with lower priority.
984  * Default priority is %G_PRIORITY_DEFAULT.
985  *
986  * The asynchronous methods have a default fallback that uses threads to
987  * implement asynchronicity, so they are optional for inheriting classes.
988  * However, if you override one, you must override all.
989  **/
990 void
991 g_input_stream_skip_async (GInputStream        *stream,
992                            gsize                count,
993                            int                  io_priority,
994                            GCancellable        *cancellable,
995                            GAsyncReadyCallback  callback,
996                            gpointer             user_data)
997 {
998   GInputStreamClass *class;
999   GError *error = NULL;
1000
1001   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1002
1003   if (count == 0)
1004     {
1005       GTask *task;
1006
1007       task = g_task_new (stream, cancellable, callback, user_data);
1008       g_task_set_source_tag (task, g_input_stream_skip_async);
1009       g_task_return_int (task, 0);
1010       g_object_unref (task);
1011       return;
1012     }
1013   
1014   if (((gssize) count) < 0)
1015     {
1016       g_task_report_new_error (stream, callback, user_data,
1017                                g_input_stream_skip_async,
1018                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1019                                _("Too large count value passed to %s"),
1020                                G_STRFUNC);
1021       return;
1022     }
1023
1024   if (!g_input_stream_set_pending (stream, &error))
1025     {
1026       g_task_report_error (stream, callback, user_data,
1027                            g_input_stream_skip_async,
1028                            error);
1029       return;
1030     }
1031
1032   class = G_INPUT_STREAM_GET_CLASS (stream);
1033   stream->priv->outstanding_callback = callback;
1034   g_object_ref (stream);
1035   class->skip_async (stream, count, io_priority, cancellable,
1036                      async_ready_callback_wrapper, user_data);
1037 }
1038
1039 /**
1040  * g_input_stream_skip_finish:
1041  * @stream: a #GInputStream.
1042  * @result: a #GAsyncResult.
1043  * @error: a #GError location to store the error occurring, or %NULL to 
1044  * ignore.
1045  * 
1046  * Finishes a stream skip operation.
1047  * 
1048  * Returns: the size of the bytes skipped, or `-1` on error.
1049  **/
1050 gssize
1051 g_input_stream_skip_finish (GInputStream  *stream,
1052                             GAsyncResult  *result,
1053                             GError       **error)
1054 {
1055   GInputStreamClass *class;
1056
1057   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
1058   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1059
1060   if (g_async_result_legacy_propagate_error (result, error))
1061     return -1;
1062   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
1063     return g_task_propagate_int (G_TASK (result), error);
1064
1065   class = G_INPUT_STREAM_GET_CLASS (stream);
1066   return class->skip_finish (stream, result, error);
1067 }
1068
1069 /**
1070  * g_input_stream_close_async:
1071  * @stream: A #GInputStream.
1072  * @io_priority: the [I/O priority][io-priority] of the request
1073  * @cancellable: (nullable): optional cancellable object
1074  * @callback: (scope async): callback to call when the request is satisfied
1075  * @user_data: (closure): the data to pass to callback function
1076  *
1077  * Requests an asynchronous closes of the stream, releasing resources related to it.
1078  * When the operation is finished @callback will be called. 
1079  * You can then call g_input_stream_close_finish() to get the result of the 
1080  * operation.
1081  *
1082  * For behaviour details see g_input_stream_close().
1083  *
1084  * The asynchronous methods have a default fallback that uses threads to implement
1085  * asynchronicity, so they are optional for inheriting classes. However, if you
1086  * override one you must override all.
1087  **/
1088 void
1089 g_input_stream_close_async (GInputStream        *stream,
1090                             int                  io_priority,
1091                             GCancellable        *cancellable,
1092                             GAsyncReadyCallback  callback,
1093                             gpointer             user_data)
1094 {
1095   GInputStreamClass *class;
1096   GError *error = NULL;
1097
1098   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1099
1100   if (stream->priv->closed)
1101     {
1102       GTask *task;
1103
1104       task = g_task_new (stream, cancellable, callback, user_data);
1105       g_task_set_source_tag (task, g_input_stream_close_async);
1106       g_task_return_boolean (task, TRUE);
1107       g_object_unref (task);
1108       return;
1109     }
1110
1111   if (!g_input_stream_set_pending (stream, &error))
1112     {
1113       g_task_report_error (stream, callback, user_data,
1114                            g_input_stream_close_async,
1115                            error);
1116       return;
1117     }
1118   
1119   class = G_INPUT_STREAM_GET_CLASS (stream);
1120   stream->priv->outstanding_callback = callback;
1121   g_object_ref (stream);
1122   class->close_async (stream, io_priority, cancellable,
1123                       async_ready_close_callback_wrapper, user_data);
1124 }
1125
1126 /**
1127  * g_input_stream_close_finish:
1128  * @stream: a #GInputStream.
1129  * @result: a #GAsyncResult.
1130  * @error: a #GError location to store the error occurring, or %NULL to 
1131  * ignore.
1132  * 
1133  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
1134  * 
1135  * Returns: %TRUE if the stream was closed successfully.
1136  **/
1137 gboolean
1138 g_input_stream_close_finish (GInputStream  *stream,
1139                              GAsyncResult  *result,
1140                              GError       **error)
1141 {
1142   GInputStreamClass *class;
1143
1144   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1145   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1146
1147   if (g_async_result_legacy_propagate_error (result, error))
1148     return FALSE;
1149   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
1150     return g_task_propagate_boolean (G_TASK (result), error);
1151
1152   class = G_INPUT_STREAM_GET_CLASS (stream);
1153   return class->close_finish (stream, result, error);
1154 }
1155
1156 /**
1157  * g_input_stream_is_closed:
1158  * @stream: input stream.
1159  * 
1160  * Checks if an input stream is closed.
1161  * 
1162  * Returns: %TRUE if the stream is closed.
1163  **/
1164 gboolean
1165 g_input_stream_is_closed (GInputStream *stream)
1166 {
1167   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1168   
1169   return stream->priv->closed;
1170 }
1171  
1172 /**
1173  * g_input_stream_has_pending:
1174  * @stream: input stream.
1175  * 
1176  * Checks if an input stream has pending actions.
1177  * 
1178  * Returns: %TRUE if @stream has pending actions.
1179  **/  
1180 gboolean
1181 g_input_stream_has_pending (GInputStream *stream)
1182 {
1183   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1184   
1185   return stream->priv->pending;
1186 }
1187
1188 /**
1189  * g_input_stream_set_pending:
1190  * @stream: input stream
1191  * @error: a #GError location to store the error occurring, or %NULL to 
1192  * ignore.
1193  * 
1194  * Sets @stream to have actions pending. If the pending flag is
1195  * already set or @stream is closed, it will return %FALSE and set
1196  * @error.
1197  *
1198  * Returns: %TRUE if pending was previously unset and is now set.
1199  **/
1200 gboolean
1201 g_input_stream_set_pending (GInputStream *stream, GError **error)
1202 {
1203   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1204   
1205   if (stream->priv->closed)
1206     {
1207       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1208                            _("Stream is already closed"));
1209       return FALSE;
1210     }
1211   
1212   if (stream->priv->pending)
1213     {
1214       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1215                 /* Translators: This is an error you get if there is already an
1216                  * operation running against this stream when you try to start
1217                  * one */
1218                  _("Stream has outstanding operation"));
1219       return FALSE;
1220     }
1221   
1222   stream->priv->pending = TRUE;
1223   return TRUE;
1224 }
1225
1226 /**
1227  * g_input_stream_clear_pending:
1228  * @stream: input stream
1229  * 
1230  * Clears the pending flag on @stream.
1231  **/
1232 void
1233 g_input_stream_clear_pending (GInputStream *stream)
1234 {
1235   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1236   
1237   stream->priv->pending = FALSE;
1238 }
1239
1240 /*< internal >
1241  * g_input_stream_async_read_is_via_threads:
1242  * @stream: input stream
1243  *
1244  * Checks if an input stream's read_async function uses threads.
1245  *
1246  * Returns: %TRUE if @stream's read_async function uses threads.
1247  **/
1248 gboolean
1249 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1250 {
1251   GInputStreamClass *class;
1252
1253   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1254
1255   class = G_INPUT_STREAM_GET_CLASS (stream);
1256
1257   return (class->read_async == g_input_stream_real_read_async &&
1258       !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1259         g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1260 }
1261
1262 /*< internal >
1263  * g_input_stream_async_close_is_via_threads:
1264  * @stream: input stream
1265  *
1266  * Checks if an input stream's close_async function uses threads.
1267  *
1268  * Returns: %TRUE if @stream's close_async function uses threads.
1269  **/
1270 gboolean
1271 g_input_stream_async_close_is_via_threads (GInputStream *stream)
1272 {
1273   GInputStreamClass *class;
1274
1275   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1276
1277   class = G_INPUT_STREAM_GET_CLASS (stream);
1278
1279   return class->close_async == g_input_stream_real_close_async;
1280 }
1281
1282 /********************************************
1283  *   Default implementation of async ops    *
1284  ********************************************/
1285
1286 typedef struct {
1287   void   *buffer;
1288   gsize   count;
1289 } ReadData;
1290
1291 static void
1292 free_read_data (ReadData *op)
1293 {
1294   g_slice_free (ReadData, op);
1295 }
1296
1297 static void
1298 read_async_thread (GTask        *task,
1299                    gpointer      source_object,
1300                    gpointer      task_data,
1301                    GCancellable *cancellable)
1302 {
1303   GInputStream *stream = source_object;
1304   ReadData *op = task_data;
1305   GInputStreamClass *class;
1306   GError *error = NULL;
1307   gssize nread;
1308  
1309   class = G_INPUT_STREAM_GET_CLASS (stream);
1310
1311   nread = class->read_fn (stream,
1312                           op->buffer, op->count,
1313                           g_task_get_cancellable (task),
1314                           &error);
1315   if (nread == -1)
1316     g_task_return_error (task, error);
1317   else
1318     g_task_return_int (task, nread);
1319 }
1320
1321 static void read_async_pollable (GPollableInputStream *stream,
1322                                  GTask                *task);
1323
1324 static gboolean
1325 read_async_pollable_ready (GPollableInputStream *stream,
1326                            gpointer              user_data)
1327 {
1328   GTask *task = user_data;
1329
1330   read_async_pollable (stream, task);
1331   return FALSE;
1332 }
1333
1334 static void
1335 read_async_pollable (GPollableInputStream *stream,
1336                      GTask                *task)
1337 {
1338   ReadData *op = g_task_get_task_data (task);
1339   GError *error = NULL;
1340   gssize nread;
1341
1342   if (g_task_return_error_if_cancelled (task))
1343     return;
1344
1345   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1346     read_nonblocking (stream, op->buffer, op->count, &error);
1347
1348   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1349     {
1350       GSource *source;
1351
1352       g_error_free (error);
1353
1354       source = g_pollable_input_stream_create_source (stream,
1355                                                       g_task_get_cancellable (task));
1356       g_task_attach_source (task, source,
1357                             (GSourceFunc) read_async_pollable_ready);
1358       g_source_unref (source);
1359       return;
1360     }
1361
1362   if (nread == -1)
1363     g_task_return_error (task, error);
1364   else
1365     g_task_return_int (task, nread);
1366   /* g_input_stream_real_read_async() unrefs task */
1367 }
1368
1369
1370 static void
1371 g_input_stream_real_read_async (GInputStream        *stream,
1372                                 void                *buffer,
1373                                 gsize                count,
1374                                 int                  io_priority,
1375                                 GCancellable        *cancellable,
1376                                 GAsyncReadyCallback  callback,
1377                                 gpointer             user_data)
1378 {
1379   GTask *task;
1380   ReadData *op;
1381   
1382   op = g_slice_new0 (ReadData);
1383   task = g_task_new (stream, cancellable, callback, user_data);
1384   g_task_set_source_tag (task, g_input_stream_real_read_async);
1385   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1386   g_task_set_priority (task, io_priority);
1387   op->buffer = buffer;
1388   op->count = count;
1389
1390   if (!g_input_stream_async_read_is_via_threads (stream))
1391     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1392   else
1393     g_task_run_in_thread (task, read_async_thread);
1394   g_object_unref (task);
1395 }
1396
1397 static gssize
1398 g_input_stream_real_read_finish (GInputStream  *stream,
1399                                  GAsyncResult  *result,
1400                                  GError       **error)
1401 {
1402   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1403
1404   return g_task_propagate_int (G_TASK (result), error);
1405 }
1406
1407
1408 static void
1409 skip_async_thread (GTask        *task,
1410                    gpointer      source_object,
1411                    gpointer      task_data,
1412                    GCancellable *cancellable)
1413 {
1414   GInputStream *stream = source_object;
1415   gsize count = GPOINTER_TO_SIZE (task_data);
1416   GInputStreamClass *class;
1417   GError *error = NULL;
1418   gssize ret;
1419
1420   class = G_INPUT_STREAM_GET_CLASS (stream);
1421   ret = class->skip (stream, count,
1422                      g_task_get_cancellable (task),
1423                      &error);
1424   if (ret == -1)
1425     g_task_return_error (task, error);
1426   else
1427     g_task_return_int (task, ret);
1428 }
1429
1430 typedef struct {
1431   char buffer[8192];
1432   gsize count;
1433   gsize count_skipped;
1434 } SkipFallbackAsyncData;
1435
1436 static void
1437 skip_callback_wrapper (GObject      *source_object,
1438                        GAsyncResult *res,
1439                        gpointer      user_data)
1440 {
1441   GInputStreamClass *class;
1442   GTask *task = user_data;
1443   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1444   GError *error = NULL;
1445   gssize ret;
1446
1447   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1448
1449   if (ret > 0)
1450     {
1451       data->count -= ret;
1452       data->count_skipped += ret;
1453
1454       if (data->count > 0)
1455         {
1456           class = G_INPUT_STREAM_GET_CLASS (source_object);
1457           class->read_async (G_INPUT_STREAM (source_object),
1458                              data->buffer, MIN (8192, data->count),
1459                              g_task_get_priority (task),
1460                              g_task_get_cancellable (task),
1461                              skip_callback_wrapper, task);
1462           return;
1463         }
1464     }
1465
1466   if (ret == -1 &&
1467       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1468       data->count_skipped)
1469     {
1470       /* No error, return partial read */
1471       g_clear_error (&error);
1472     }
1473
1474   if (error)
1475     g_task_return_error (task, error);
1476   else
1477     g_task_return_int (task, data->count_skipped);
1478   g_object_unref (task);
1479  }
1480
1481 static void
1482 g_input_stream_real_skip_async (GInputStream        *stream,
1483                                 gsize                count,
1484                                 int                  io_priority,
1485                                 GCancellable        *cancellable,
1486                                 GAsyncReadyCallback  callback,
1487                                 gpointer             user_data)
1488 {
1489   GInputStreamClass *class;
1490   SkipFallbackAsyncData *data;
1491   GTask *task;
1492
1493   class = G_INPUT_STREAM_GET_CLASS (stream);
1494
1495   task = g_task_new (stream, cancellable, callback, user_data);
1496   g_task_set_source_tag (task, g_input_stream_real_skip_async);
1497   g_task_set_priority (task, io_priority);
1498
1499   if (g_input_stream_async_read_is_via_threads (stream))
1500     {
1501       /* Read is thread-using async fallback.
1502        * Make skip use threads too, so that we can use a possible sync skip
1503        * implementation. */
1504       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1505
1506       g_task_run_in_thread (task, skip_async_thread);
1507       g_object_unref (task);
1508     }
1509   else
1510     {
1511       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1512       
1513       /* There is a custom async read function, lets use that. */
1514       data = g_new (SkipFallbackAsyncData, 1);
1515       data->count = count;
1516       data->count_skipped = 0;
1517       g_task_set_task_data (task, data, g_free);
1518       g_task_set_check_cancellable (task, FALSE);
1519       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1520                          skip_callback_wrapper, task);
1521     }
1522
1523 }
1524
1525 static gssize
1526 g_input_stream_real_skip_finish (GInputStream  *stream,
1527                                  GAsyncResult  *result,
1528                                  GError       **error)
1529 {
1530   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1531
1532   return g_task_propagate_int (G_TASK (result), error);
1533 }
1534
1535 static void
1536 close_async_thread (GTask        *task,
1537                     gpointer      source_object,
1538                     gpointer      task_data,
1539                     GCancellable *cancellable)
1540 {
1541   GInputStream *stream = source_object;
1542   GInputStreamClass *class;
1543   GError *error = NULL;
1544   gboolean result;
1545
1546   class = G_INPUT_STREAM_GET_CLASS (stream);
1547   if (class->close_fn)
1548     {
1549       result = class->close_fn (stream,
1550                                 g_task_get_cancellable (task),
1551                                 &error);
1552       if (!result)
1553         {
1554           g_task_return_error (task, error);
1555           return;
1556         }
1557     }
1558
1559   g_task_return_boolean (task, TRUE);
1560 }
1561
1562 static void
1563 g_input_stream_real_close_async (GInputStream        *stream,
1564                                  int                  io_priority,
1565                                  GCancellable        *cancellable,
1566                                  GAsyncReadyCallback  callback,
1567                                  gpointer             user_data)
1568 {
1569   GTask *task;
1570
1571   task = g_task_new (stream, cancellable, callback, user_data);
1572   g_task_set_source_tag (task, g_input_stream_real_close_async);
1573   g_task_set_check_cancellable (task, FALSE);
1574   g_task_set_priority (task, io_priority);
1575   
1576   g_task_run_in_thread (task, close_async_thread);
1577   g_object_unref (task);
1578 }
1579
1580 static gboolean
1581 g_input_stream_real_close_finish (GInputStream  *stream,
1582                                   GAsyncResult  *result,
1583                                   GError       **error)
1584 {
1585   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1586
1587   return g_task_propagate_boolean (G_TASK (result), error);
1588 }