Imported Upstream version 2.72.3
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Alexander Larsson <alexl@redhat.com>
19  */
20
21 #include "config.h"
22 #include <glib.h>
23 #include "glibintl.h"
24
25 #include "ginputstream.h"
26 #include "gioprivate.h"
27 #include "gseekable.h"
28 #include "gcancellable.h"
29 #include "gasyncresult.h"
30 #include "gioerror.h"
31 #include "gpollableinputstream.h"
32
33 /**
34  * SECTION:ginputstream
35  * @short_description: Base class for implementing streaming input
36  * @include: gio/gio.h
37  *
38  * #GInputStream has functions to read from a stream (g_input_stream_read()),
39  * to close a stream (g_input_stream_close()) and to skip some content
40  * (g_input_stream_skip()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice().
44  *
45  * See the documentation for #GIOStream for details of thread safety of
46  * streaming APIs.
47  *
48  * All of these functions have async variants too.
49  **/
50
51 struct _GInputStreamPrivate {
52   guint closed : 1;
53   guint pending : 1;
54   GAsyncReadyCallback outstanding_callback;
55 };
56
57 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
58
59 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
60                                                   gsize                 count,
61                                                   GCancellable         *cancellable,
62                                                   GError              **error);
63 static void     g_input_stream_real_read_async   (GInputStream         *stream,
64                                                   void                 *buffer,
65                                                   gsize                 count,
66                                                   int                   io_priority,
67                                                   GCancellable         *cancellable,
68                                                   GAsyncReadyCallback   callback,
69                                                   gpointer              user_data);
70 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
71                                                   GAsyncResult         *result,
72                                                   GError              **error);
73 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
74                                                   gsize                 count,
75                                                   int                   io_priority,
76                                                   GCancellable         *cancellable,
77                                                   GAsyncReadyCallback   callback,
78                                                   gpointer              data);
79 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
80                                                   GAsyncResult         *result,
81                                                   GError              **error);
82 static void     g_input_stream_real_close_async  (GInputStream         *stream,
83                                                   int                   io_priority,
84                                                   GCancellable         *cancellable,
85                                                   GAsyncReadyCallback   callback,
86                                                   gpointer              data);
87 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
88                                                   GAsyncResult         *result,
89                                                   GError              **error);
90
91 static void
92 g_input_stream_dispose (GObject *object)
93 {
94   GInputStream *stream;
95
96   stream = G_INPUT_STREAM (object);
97   
98   if (!stream->priv->closed)
99     g_input_stream_close (stream, NULL, NULL);
100
101   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
102 }
103
104
105 static void
106 g_input_stream_class_init (GInputStreamClass *klass)
107 {
108   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
109   
110   gobject_class->dispose = g_input_stream_dispose;
111   
112   klass->skip = g_input_stream_real_skip;
113   klass->read_async = g_input_stream_real_read_async;
114   klass->read_finish = g_input_stream_real_read_finish;
115   klass->skip_async = g_input_stream_real_skip_async;
116   klass->skip_finish = g_input_stream_real_skip_finish;
117   klass->close_async = g_input_stream_real_close_async;
118   klass->close_finish = g_input_stream_real_close_finish;
119 }
120
121 static void
122 g_input_stream_init (GInputStream *stream)
123 {
124   stream->priv = g_input_stream_get_instance_private (stream);
125 }
126
127 /**
128  * g_input_stream_read:
129  * @stream: a #GInputStream.
130  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
131  *     a buffer to read data into (which should be at least count bytes long).
132  * @count: (in): the number of bytes that will be read from the stream
133  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
134  * @error: location to store the error occurring, or %NULL to ignore
135  *
136  * Tries to read @count bytes from the stream into the buffer starting at
137  * @buffer. Will block during this read.
138  * 
139  * If count is zero returns zero and does nothing. A value of @count
140  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
141  *
142  * On success, the number of bytes read into the buffer is returned.
143  * It is not an error if this is not the same as the requested size, as it
144  * can happen e.g. near the end of a file. Zero is returned on end of file
145  * (or if @count is zero),  but never otherwise.
146  *
147  * The returned @buffer is not a nul-terminated string, it can contain nul bytes
148  * at any position, and this function doesn't nul-terminate the @buffer.
149  *
150  * If @cancellable is not %NULL, then the operation can be cancelled by
151  * triggering the cancellable object from another thread. If the operation
152  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
153  * operation was partially finished when the operation was cancelled the
154  * partial result will be returned, without an error.
155  *
156  * On error -1 is returned and @error is set accordingly.
157  * 
158  * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
159  **/
160 gssize
161 g_input_stream_read  (GInputStream  *stream,
162                       void          *buffer,
163                       gsize          count,
164                       GCancellable  *cancellable,
165                       GError       **error)
166 {
167   GInputStreamClass *class;
168   gssize res;
169
170   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
171   g_return_val_if_fail (buffer != NULL, 0);
172
173   if (count == 0)
174     return 0;
175   
176   if (((gssize) count) < 0)
177     {
178       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
179                    _("Too large count value passed to %s"), G_STRFUNC);
180       return -1;
181     }
182
183   class = G_INPUT_STREAM_GET_CLASS (stream);
184
185   if (class->read_fn == NULL) 
186     {
187       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
188                            _("Input stream doesn’t implement read"));
189       return -1;
190     }
191
192   if (!g_input_stream_set_pending (stream, error))
193     return -1;
194
195   if (cancellable)
196     g_cancellable_push_current (cancellable);
197   
198   res = class->read_fn (stream, buffer, count, cancellable, error);
199
200   if (cancellable)
201     g_cancellable_pop_current (cancellable);
202   
203   g_input_stream_clear_pending (stream);
204
205   return res;
206 }
207
208 /**
209  * g_input_stream_read_all:
210  * @stream: a #GInputStream.
211  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
212  *     a buffer to read data into (which should be at least count bytes long).
213  * @count: (in): the number of bytes that will be read from the stream
214  * @bytes_read: (out): location to store the number of bytes that was read from the stream
215  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
216  * @error: location to store the error occurring, or %NULL to ignore
217  *
218  * Tries to read @count bytes from the stream into the buffer starting at
219  * @buffer. Will block during this read.
220  *
221  * This function is similar to g_input_stream_read(), except it tries to
222  * read as many bytes as requested, only stopping on an error or end of stream.
223  *
224  * On a successful read of @count bytes, or if we reached the end of the
225  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
226  * read into @buffer.
227  * 
228  * If there is an error during the operation %FALSE is returned and @error
229  * is set to indicate the error status.
230  *
231  * As a special exception to the normal conventions for functions that
232  * use #GError, if this function returns %FALSE (and sets @error) then
233  * @bytes_read will be set to the number of bytes that were successfully
234  * read before the error was encountered.  This functionality is only
235  * available from C.  If you need it from another language then you must
236  * write your own loop around g_input_stream_read().
237  *
238  * Returns: %TRUE on success, %FALSE if there was an error
239  **/
240 gboolean
241 g_input_stream_read_all (GInputStream  *stream,
242                          void          *buffer,
243                          gsize          count,
244                          gsize         *bytes_read,
245                          GCancellable  *cancellable,
246                          GError       **error)
247 {
248   gsize _bytes_read;
249   gssize res;
250
251   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
252   g_return_val_if_fail (buffer != NULL, FALSE);
253
254   _bytes_read = 0;
255   while (_bytes_read < count)
256     {
257       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
258                                  cancellable, error);
259       if (res == -1)
260         {
261           if (bytes_read)
262             *bytes_read = _bytes_read;
263           return FALSE;
264         }
265       
266       if (res == 0)
267         break;
268
269       _bytes_read += res;
270     }
271
272   if (bytes_read)
273     *bytes_read = _bytes_read;
274   return TRUE;
275 }
276
277 /**
278  * g_input_stream_read_bytes:
279  * @stream: a #GInputStream.
280  * @count: maximum number of bytes that will be read from the stream. Common
281  * values include 4096 and 8192.
282  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
283  * @error: location to store the error occurring, or %NULL to ignore
284  *
285  * Like g_input_stream_read(), this tries to read @count bytes from
286  * the stream in a blocking fashion. However, rather than reading into
287  * a user-supplied buffer, this will create a new #GBytes containing
288  * the data that was read. This may be easier to use from language
289  * bindings.
290  *
291  * If count is zero, returns a zero-length #GBytes and does nothing. A
292  * value of @count larger than %G_MAXSSIZE will cause a
293  * %G_IO_ERROR_INVALID_ARGUMENT error.
294  *
295  * On success, a new #GBytes is returned. It is not an error if the
296  * size of this object is not the same as the requested size, as it
297  * can happen e.g. near the end of a file. A zero-length #GBytes is
298  * returned on end of file (or if @count is zero), but never
299  * otherwise.
300  *
301  * If @cancellable is not %NULL, then the operation can be cancelled by
302  * triggering the cancellable object from another thread. If the operation
303  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
304  * operation was partially finished when the operation was cancelled the
305  * partial result will be returned, without an error.
306  *
307  * On error %NULL is returned and @error is set accordingly.
308  *
309  * Returns: (transfer full): a new #GBytes, or %NULL on error
310  *
311  * Since: 2.34
312  **/
313 GBytes *
314 g_input_stream_read_bytes (GInputStream  *stream,
315                            gsize          count,
316                            GCancellable  *cancellable,
317                            GError       **error)
318 {
319   guchar *buf;
320   gssize nread;
321
322   buf = g_malloc (count);
323   nread = g_input_stream_read (stream, buf, count, cancellable, error);
324   if (nread == -1)
325     {
326       g_free (buf);
327       return NULL;
328     }
329   else if (nread == 0)
330     {
331       g_free (buf);
332       return g_bytes_new_static ("", 0);
333     }
334   else
335     return g_bytes_new_take (buf, nread);
336 }
337
338 /**
339  * g_input_stream_skip:
340  * @stream: a #GInputStream.
341  * @count: the number of bytes that will be skipped from the stream
342  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore. 
343  * @error: location to store the error occurring, or %NULL to ignore
344  *
345  * Tries to skip @count bytes from the stream. Will block during the operation.
346  *
347  * This is identical to g_input_stream_read(), from a behaviour standpoint,
348  * but the bytes that are skipped are not returned to the user. Some
349  * streams have an implementation that is more efficient than reading the data.
350  *
351  * This function is optional for inherited classes, as the default implementation
352  * emulates it using read.
353  *
354  * If @cancellable is not %NULL, then the operation can be cancelled by
355  * triggering the cancellable object from another thread. If the operation
356  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
357  * operation was partially finished when the operation was cancelled the
358  * partial result will be returned, without an error.
359  *
360  * Returns: Number of bytes skipped, or -1 on error
361  **/
362 gssize
363 g_input_stream_skip (GInputStream  *stream,
364                      gsize          count,
365                      GCancellable  *cancellable,
366                      GError       **error)
367 {
368   GInputStreamClass *class;
369   gssize res;
370
371   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
372
373   if (count == 0)
374     return 0;
375
376   if (((gssize) count) < 0)
377     {
378       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
379                    _("Too large count value passed to %s"), G_STRFUNC);
380       return -1;
381     }
382   
383   class = G_INPUT_STREAM_GET_CLASS (stream);
384
385   if (!g_input_stream_set_pending (stream, error))
386     return -1;
387
388   if (cancellable)
389     g_cancellable_push_current (cancellable);
390   
391   res = class->skip (stream, count, cancellable, error);
392
393   if (cancellable)
394     g_cancellable_pop_current (cancellable);
395   
396   g_input_stream_clear_pending (stream);
397
398   return res;
399 }
400
401 static gssize
402 g_input_stream_real_skip (GInputStream  *stream,
403                           gsize          count,
404                           GCancellable  *cancellable,
405                           GError       **error)
406 {
407   GInputStreamClass *class;
408   gssize ret, read_bytes;
409   char buffer[8192];
410   GError *my_error;
411
412   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
413     {
414       GSeekable *seekable = G_SEEKABLE (stream);
415       goffset start, end;
416       gboolean success;
417
418       /* g_seekable_seek() may try to set pending itself */
419       stream->priv->pending = FALSE;
420
421       start = g_seekable_tell (seekable);
422
423       if (g_seekable_seek (G_SEEKABLE (stream),
424                            0,
425                            G_SEEK_END,
426                            cancellable,
427                            NULL))
428         {
429           end = g_seekable_tell (seekable);
430           g_assert (start >= 0);
431           g_assert (end >= start);
432           if (start > (goffset) (G_MAXOFFSET - count) ||
433               (goffset) (start + count) > end)
434             {
435               stream->priv->pending = TRUE;
436               return end - start;
437             }
438
439           success = g_seekable_seek (G_SEEKABLE (stream),
440                                      start + count,
441                                      G_SEEK_SET,
442                                      cancellable,
443                                      error);
444           stream->priv->pending = TRUE;
445
446           if (success)
447             return count;
448           else
449             return -1;
450         }
451     }
452
453   /* If not seekable, or seek failed, fall back to reading data: */
454
455   class = G_INPUT_STREAM_GET_CLASS (stream);
456
457   read_bytes = 0;
458   while (1)
459     {
460       my_error = NULL;
461
462       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
463                             cancellable, &my_error);
464       if (ret == -1)
465         {
466           if (read_bytes > 0 &&
467               my_error->domain == G_IO_ERROR &&
468               my_error->code == G_IO_ERROR_CANCELLED)
469             {
470               g_error_free (my_error);
471               return read_bytes;
472             }
473
474           g_propagate_error (error, my_error);
475           return -1;
476         }
477
478       count -= ret;
479       read_bytes += ret;
480
481       if (ret == 0 || count == 0)
482         return read_bytes;
483     }
484 }
485
486 /**
487  * g_input_stream_close:
488  * @stream: A #GInputStream.
489  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
490  * @error: location to store the error occurring, or %NULL to ignore
491  *
492  * Closes the stream, releasing resources related to it.
493  *
494  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
495  * Closing a stream multiple times will not return an error.
496  *
497  * Streams will be automatically closed when the last reference
498  * is dropped, but you might want to call this function to make sure 
499  * resources are released as early as possible.
500  *
501  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
502  * open after the stream is closed. See the documentation for the individual
503  * stream for details.
504  *
505  * On failure the first error that happened will be reported, but the close
506  * operation will finish as much as possible. A stream that failed to
507  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
508  * is important to check and report the error to the user.
509  *
510  * If @cancellable is not %NULL, then the operation can be cancelled by
511  * triggering the cancellable object from another thread. If the operation
512  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
513  * Cancelling a close will still leave the stream closed, but some streams
514  * can use a faster close that doesn't block to e.g. check errors. 
515  *
516  * Returns: %TRUE on success, %FALSE on failure
517  **/
518 gboolean
519 g_input_stream_close (GInputStream  *stream,
520                       GCancellable  *cancellable,
521                       GError       **error)
522 {
523   GInputStreamClass *class;
524   gboolean res;
525
526   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
527
528   class = G_INPUT_STREAM_GET_CLASS (stream);
529
530   if (stream->priv->closed)
531     return TRUE;
532
533   res = TRUE;
534
535   if (!g_input_stream_set_pending (stream, error))
536     return FALSE;
537
538   if (cancellable)
539     g_cancellable_push_current (cancellable);
540
541   if (class->close_fn)
542     res = class->close_fn (stream, cancellable, error);
543
544   if (cancellable)
545     g_cancellable_pop_current (cancellable);
546
547   g_input_stream_clear_pending (stream);
548   
549   stream->priv->closed = TRUE;
550   
551   return res;
552 }
553
554 static void
555 async_ready_callback_wrapper (GObject      *source_object,
556                               GAsyncResult *res,
557                               gpointer      user_data)
558 {
559   GInputStream *stream = G_INPUT_STREAM (source_object);
560
561   g_input_stream_clear_pending (stream);
562   if (stream->priv->outstanding_callback)
563     (*stream->priv->outstanding_callback) (source_object, res, user_data);
564   g_object_unref (stream);
565 }
566
567 static void
568 async_ready_close_callback_wrapper (GObject      *source_object,
569                                     GAsyncResult *res,
570                                     gpointer      user_data)
571 {
572   GInputStream *stream = G_INPUT_STREAM (source_object);
573
574   g_input_stream_clear_pending (stream);
575   stream->priv->closed = TRUE;
576   if (stream->priv->outstanding_callback)
577     (*stream->priv->outstanding_callback) (source_object, res, user_data);
578   g_object_unref (stream);
579 }
580
581 /**
582  * g_input_stream_read_async:
583  * @stream: A #GInputStream.
584  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
585  *     a buffer to read data into (which should be at least count bytes long).
586  * @count: (in): the number of bytes that will be read from the stream
587  * @io_priority: the [I/O priority][io-priority]
588  * of the request. 
589  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
590  * @callback: (scope async): callback to call when the request is satisfied
591  * @user_data: (closure): the data to pass to callback function
592  *
593  * Request an asynchronous read of @count bytes from the stream into the buffer
594  * starting at @buffer. When the operation is finished @callback will be called. 
595  * You can then call g_input_stream_read_finish() to get the result of the 
596  * operation.
597  *
598  * During an async request no other sync and async calls are allowed on @stream, and will
599  * result in %G_IO_ERROR_PENDING errors. 
600  *
601  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
602  *
603  * On success, the number of bytes read into the buffer will be passed to the
604  * callback. It is not an error if this is not the same as the requested size, as it
605  * can happen e.g. near the end of a file, but generally we try to read
606  * as many bytes as requested. Zero is returned on end of file
607  * (or if @count is zero),  but never otherwise.
608  *
609  * Any outstanding i/o request with higher priority (lower numerical value) will
610  * be executed before an outstanding request with lower priority. Default
611  * priority is %G_PRIORITY_DEFAULT.
612  *
613  * The asynchronous methods have a default fallback that uses threads to implement
614  * asynchronicity, so they are optional for inheriting classes. However, if you
615  * override one you must override all.
616  **/
617 void
618 g_input_stream_read_async (GInputStream        *stream,
619                            void                *buffer,
620                            gsize                count,
621                            int                  io_priority,
622                            GCancellable        *cancellable,
623                            GAsyncReadyCallback  callback,
624                            gpointer             user_data)
625 {
626   GInputStreamClass *class;
627   GError *error = NULL;
628
629   g_return_if_fail (G_IS_INPUT_STREAM (stream));
630   g_return_if_fail (buffer != NULL);
631
632   if (count == 0)
633     {
634       GTask *task;
635
636       task = g_task_new (stream, cancellable, callback, user_data);
637       g_task_set_source_tag (task, g_input_stream_read_async);
638       g_task_return_int (task, 0);
639       g_object_unref (task);
640       return;
641     }
642   
643   if (((gssize) count) < 0)
644     {
645       g_task_report_new_error (stream, callback, user_data,
646                                g_input_stream_read_async,
647                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
648                                _("Too large count value passed to %s"),
649                                G_STRFUNC);
650       return;
651     }
652
653   if (!g_input_stream_set_pending (stream, &error))
654     {
655       g_task_report_error (stream, callback, user_data,
656                            g_input_stream_read_async,
657                            error);
658       return;
659     }
660
661   class = G_INPUT_STREAM_GET_CLASS (stream);
662   stream->priv->outstanding_callback = callback;
663   g_object_ref (stream);
664   class->read_async (stream, buffer, count, io_priority, cancellable,
665                      async_ready_callback_wrapper, user_data);
666 }
667
668 /**
669  * g_input_stream_read_finish:
670  * @stream: a #GInputStream.
671  * @result: a #GAsyncResult.
672  * @error: a #GError location to store the error occurring, or %NULL to 
673  * ignore.
674  * 
675  * Finishes an asynchronous stream read operation. 
676  * 
677  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
678  **/
679 gssize
680 g_input_stream_read_finish (GInputStream  *stream,
681                             GAsyncResult  *result,
682                             GError       **error)
683 {
684   GInputStreamClass *class;
685   
686   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
687   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
688
689   if (g_async_result_legacy_propagate_error (result, error))
690     return -1;
691   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
692     return g_task_propagate_int (G_TASK (result), error);
693
694   class = G_INPUT_STREAM_GET_CLASS (stream);
695   return class->read_finish (stream, result, error);
696 }
697
698 typedef struct
699 {
700   gchar *buffer;
701   gsize to_read;
702   gsize bytes_read;
703 } AsyncReadAll;
704
705 static void
706 free_async_read_all (gpointer data)
707 {
708   g_slice_free (AsyncReadAll, data);
709 }
710
711 static void
712 read_all_callback (GObject      *stream,
713                    GAsyncResult *result,
714                    gpointer      user_data)
715 {
716   GTask *task = user_data;
717   AsyncReadAll *data = g_task_get_task_data (task);
718   gboolean got_eof = FALSE;
719
720   if (result)
721     {
722       GError *error = NULL;
723       gssize nread;
724
725       nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
726
727       if (nread == -1)
728         {
729           g_task_return_error (task, error);
730           g_object_unref (task);
731           return;
732         }
733
734       g_assert_cmpint (nread, <=, data->to_read);
735       data->to_read -= nread;
736       data->bytes_read += nread;
737       got_eof = (nread == 0);
738     }
739
740   if (got_eof || data->to_read == 0)
741     {
742       g_task_return_boolean (task, TRUE);
743       g_object_unref (task);
744     }
745
746   else
747     g_input_stream_read_async (G_INPUT_STREAM (stream),
748                                data->buffer + data->bytes_read,
749                                data->to_read,
750                                g_task_get_priority (task),
751                                g_task_get_cancellable (task),
752                                read_all_callback, task);
753 }
754
755
756 static void
757 read_all_async_thread (GTask        *task,
758                        gpointer      source_object,
759                        gpointer      task_data,
760                        GCancellable *cancellable)
761 {
762   GInputStream *stream = source_object;
763   AsyncReadAll *data = task_data;
764   GError *error = NULL;
765
766   if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
767                                g_task_get_cancellable (task), &error))
768     g_task_return_boolean (task, TRUE);
769   else
770     g_task_return_error (task, error);
771 }
772
773 /**
774  * g_input_stream_read_all_async:
775  * @stream: A #GInputStream
776  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
777  *     a buffer to read data into (which should be at least count bytes long)
778  * @count: (in): the number of bytes that will be read from the stream
779  * @io_priority: the [I/O priority][io-priority] of the request
780  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
781  * @callback: (scope async): callback to call when the request is satisfied
782  * @user_data: (closure): the data to pass to callback function
783  *
784  * Request an asynchronous read of @count bytes from the stream into the
785  * buffer starting at @buffer.
786  *
787  * This is the asynchronous equivalent of g_input_stream_read_all().
788  *
789  * Call g_input_stream_read_all_finish() to collect the result.
790  *
791  * Any outstanding I/O request with higher priority (lower numerical
792  * value) will be executed before an outstanding request with lower
793  * priority. Default priority is %G_PRIORITY_DEFAULT.
794  *
795  * Since: 2.44
796  **/
797 void
798 g_input_stream_read_all_async (GInputStream        *stream,
799                                void                *buffer,
800                                gsize                count,
801                                int                  io_priority,
802                                GCancellable        *cancellable,
803                                GAsyncReadyCallback  callback,
804                                gpointer             user_data)
805 {
806   AsyncReadAll *data;
807   GTask *task;
808
809   g_return_if_fail (G_IS_INPUT_STREAM (stream));
810   g_return_if_fail (buffer != NULL || count == 0);
811
812   task = g_task_new (stream, cancellable, callback, user_data);
813   data = g_slice_new0 (AsyncReadAll);
814   data->buffer = buffer;
815   data->to_read = count;
816
817   g_task_set_source_tag (task, g_input_stream_read_all_async);
818   g_task_set_task_data (task, data, free_async_read_all);
819   g_task_set_priority (task, io_priority);
820
821   /* If async reads are going to be handled via the threadpool anyway
822    * then we may as well do it with a single dispatch instead of
823    * bouncing in and out.
824    */
825   if (g_input_stream_async_read_is_via_threads (stream))
826     {
827       g_task_run_in_thread (task, read_all_async_thread);
828       g_object_unref (task);
829     }
830   else
831     read_all_callback (G_OBJECT (stream), NULL, task);
832 }
833
834 /**
835  * g_input_stream_read_all_finish:
836  * @stream: a #GInputStream
837  * @result: a #GAsyncResult
838  * @bytes_read: (out): location to store the number of bytes that was read from the stream
839  * @error: a #GError location to store the error occurring, or %NULL to ignore
840  *
841  * Finishes an asynchronous stream read operation started with
842  * g_input_stream_read_all_async().
843  *
844  * As a special exception to the normal conventions for functions that
845  * use #GError, if this function returns %FALSE (and sets @error) then
846  * @bytes_read will be set to the number of bytes that were successfully
847  * read before the error was encountered.  This functionality is only
848  * available from C.  If you need it from another language then you must
849  * write your own loop around g_input_stream_read_async().
850  *
851  * Returns: %TRUE on success, %FALSE if there was an error
852  *
853  * Since: 2.44
854  **/
855 gboolean
856 g_input_stream_read_all_finish (GInputStream  *stream,
857                                 GAsyncResult  *result,
858                                 gsize         *bytes_read,
859                                 GError       **error)
860 {
861   GTask *task;
862
863   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
864   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
865
866   task = G_TASK (result);
867
868   if (bytes_read)
869     {
870       AsyncReadAll *data = g_task_get_task_data (task);
871
872       *bytes_read = data->bytes_read;
873     }
874
875   return g_task_propagate_boolean (task, error);
876 }
877
878 static void
879 read_bytes_callback (GObject      *stream,
880                      GAsyncResult *result,
881                      gpointer      user_data)
882 {
883   GTask *task = user_data;
884   guchar *buf = g_task_get_task_data (task);
885   GError *error = NULL;
886   gssize nread;
887   GBytes *bytes = NULL;
888
889   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
890                                       result, &error);
891   if (nread == -1)
892     {
893       g_free (buf);
894       g_task_return_error (task, error);
895     }
896   else if (nread == 0)
897     {
898       g_free (buf);
899       bytes = g_bytes_new_static ("", 0);
900     }
901   else
902     bytes = g_bytes_new_take (buf, nread);
903
904   if (bytes)
905     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
906
907   g_object_unref (task);
908 }
909
910 /**
911  * g_input_stream_read_bytes_async:
912  * @stream: A #GInputStream.
913  * @count: the number of bytes that will be read from the stream
914  * @io_priority: the [I/O priority][io-priority] of the request
915  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
916  * @callback: (scope async): callback to call when the request is satisfied
917  * @user_data: (closure): the data to pass to callback function
918  *
919  * Request an asynchronous read of @count bytes from the stream into a
920  * new #GBytes. When the operation is finished @callback will be
921  * called. You can then call g_input_stream_read_bytes_finish() to get the
922  * result of the operation.
923  *
924  * During an async request no other sync and async calls are allowed
925  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
926  *
927  * A value of @count larger than %G_MAXSSIZE will cause a
928  * %G_IO_ERROR_INVALID_ARGUMENT error.
929  *
930  * On success, the new #GBytes will be passed to the callback. It is
931  * not an error if this is smaller than the requested size, as it can
932  * happen e.g. near the end of a file, but generally we try to read as
933  * many bytes as requested. Zero is returned on end of file (or if
934  * @count is zero), but never otherwise.
935  *
936  * Any outstanding I/O request with higher priority (lower numerical
937  * value) will be executed before an outstanding request with lower
938  * priority. Default priority is %G_PRIORITY_DEFAULT.
939  *
940  * Since: 2.34
941  **/
942 void
943 g_input_stream_read_bytes_async (GInputStream          *stream,
944                                  gsize                  count,
945                                  int                    io_priority,
946                                  GCancellable          *cancellable,
947                                  GAsyncReadyCallback    callback,
948                                  gpointer               user_data)
949 {
950   GTask *task;
951   guchar *buf;
952
953   task = g_task_new (stream, cancellable, callback, user_data);
954   g_task_set_source_tag (task, g_input_stream_read_bytes_async);
955
956   buf = g_malloc (count);
957   g_task_set_task_data (task, buf, NULL);
958
959   g_input_stream_read_async (stream, buf, count,
960                              io_priority, cancellable,
961                              read_bytes_callback, task);
962 }
963
964 /**
965  * g_input_stream_read_bytes_finish:
966  * @stream: a #GInputStream.
967  * @result: a #GAsyncResult.
968  * @error: a #GError location to store the error occurring, or %NULL to
969  *   ignore.
970  *
971  * Finishes an asynchronous stream read-into-#GBytes operation.
972  *
973  * Returns: (transfer full): the newly-allocated #GBytes, or %NULL on error
974  *
975  * Since: 2.34
976  **/
977 GBytes *
978 g_input_stream_read_bytes_finish (GInputStream  *stream,
979                                   GAsyncResult  *result,
980                                   GError       **error)
981 {
982   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
983   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
984
985   return g_task_propagate_pointer (G_TASK (result), error);
986 }
987
988 /**
989  * g_input_stream_skip_async:
990  * @stream: A #GInputStream.
991  * @count: the number of bytes that will be skipped from the stream
992  * @io_priority: the [I/O priority][io-priority] of the request
993  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
994  * @callback: (scope async): callback to call when the request is satisfied
995  * @user_data: (closure): the data to pass to callback function
996  *
997  * Request an asynchronous skip of @count bytes from the stream.
998  * When the operation is finished @callback will be called.
999  * You can then call g_input_stream_skip_finish() to get the result
1000  * of the operation.
1001  *
1002  * During an async request no other sync and async calls are allowed,
1003  * and will result in %G_IO_ERROR_PENDING errors.
1004  *
1005  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
1006  *
1007  * On success, the number of bytes skipped will be passed to the callback.
1008  * It is not an error if this is not the same as the requested size, as it
1009  * can happen e.g. near the end of a file, but generally we try to skip
1010  * as many bytes as requested. Zero is returned on end of file
1011  * (or if @count is zero), but never otherwise.
1012  *
1013  * Any outstanding i/o request with higher priority (lower numerical value)
1014  * will be executed before an outstanding request with lower priority.
1015  * Default priority is %G_PRIORITY_DEFAULT.
1016  *
1017  * The asynchronous methods have a default fallback that uses threads to
1018  * implement asynchronicity, so they are optional for inheriting classes.
1019  * However, if you override one, you must override all.
1020  **/
1021 void
1022 g_input_stream_skip_async (GInputStream        *stream,
1023                            gsize                count,
1024                            int                  io_priority,
1025                            GCancellable        *cancellable,
1026                            GAsyncReadyCallback  callback,
1027                            gpointer             user_data)
1028 {
1029   GInputStreamClass *class;
1030   GError *error = NULL;
1031
1032   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1033
1034   if (count == 0)
1035     {
1036       GTask *task;
1037
1038       task = g_task_new (stream, cancellable, callback, user_data);
1039       g_task_set_source_tag (task, g_input_stream_skip_async);
1040       g_task_return_int (task, 0);
1041       g_object_unref (task);
1042       return;
1043     }
1044   
1045   if (((gssize) count) < 0)
1046     {
1047       g_task_report_new_error (stream, callback, user_data,
1048                                g_input_stream_skip_async,
1049                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1050                                _("Too large count value passed to %s"),
1051                                G_STRFUNC);
1052       return;
1053     }
1054
1055   if (!g_input_stream_set_pending (stream, &error))
1056     {
1057       g_task_report_error (stream, callback, user_data,
1058                            g_input_stream_skip_async,
1059                            error);
1060       return;
1061     }
1062
1063   class = G_INPUT_STREAM_GET_CLASS (stream);
1064   stream->priv->outstanding_callback = callback;
1065   g_object_ref (stream);
1066   class->skip_async (stream, count, io_priority, cancellable,
1067                      async_ready_callback_wrapper, user_data);
1068 }
1069
1070 /**
1071  * g_input_stream_skip_finish:
1072  * @stream: a #GInputStream.
1073  * @result: a #GAsyncResult.
1074  * @error: a #GError location to store the error occurring, or %NULL to 
1075  * ignore.
1076  * 
1077  * Finishes a stream skip operation.
1078  * 
1079  * Returns: the size of the bytes skipped, or `-1` on error.
1080  **/
1081 gssize
1082 g_input_stream_skip_finish (GInputStream  *stream,
1083                             GAsyncResult  *result,
1084                             GError       **error)
1085 {
1086   GInputStreamClass *class;
1087
1088   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
1089   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1090
1091   if (g_async_result_legacy_propagate_error (result, error))
1092     return -1;
1093   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
1094     return g_task_propagate_int (G_TASK (result), error);
1095
1096   class = G_INPUT_STREAM_GET_CLASS (stream);
1097   return class->skip_finish (stream, result, error);
1098 }
1099
1100 /**
1101  * g_input_stream_close_async:
1102  * @stream: A #GInputStream.
1103  * @io_priority: the [I/O priority][io-priority] of the request
1104  * @cancellable: (nullable): optional cancellable object
1105  * @callback: (scope async): callback to call when the request is satisfied
1106  * @user_data: (closure): the data to pass to callback function
1107  *
1108  * Requests an asynchronous closes of the stream, releasing resources related to it.
1109  * When the operation is finished @callback will be called. 
1110  * You can then call g_input_stream_close_finish() to get the result of the 
1111  * operation.
1112  *
1113  * For behaviour details see g_input_stream_close().
1114  *
1115  * The asynchronous methods have a default fallback that uses threads to implement
1116  * asynchronicity, so they are optional for inheriting classes. However, if you
1117  * override one you must override all.
1118  **/
1119 void
1120 g_input_stream_close_async (GInputStream        *stream,
1121                             int                  io_priority,
1122                             GCancellable        *cancellable,
1123                             GAsyncReadyCallback  callback,
1124                             gpointer             user_data)
1125 {
1126   GInputStreamClass *class;
1127   GError *error = NULL;
1128
1129   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1130
1131   if (stream->priv->closed)
1132     {
1133       GTask *task;
1134
1135       task = g_task_new (stream, cancellable, callback, user_data);
1136       g_task_set_source_tag (task, g_input_stream_close_async);
1137       g_task_return_boolean (task, TRUE);
1138       g_object_unref (task);
1139       return;
1140     }
1141
1142   if (!g_input_stream_set_pending (stream, &error))
1143     {
1144       g_task_report_error (stream, callback, user_data,
1145                            g_input_stream_close_async,
1146                            error);
1147       return;
1148     }
1149   
1150   class = G_INPUT_STREAM_GET_CLASS (stream);
1151   stream->priv->outstanding_callback = callback;
1152   g_object_ref (stream);
1153   class->close_async (stream, io_priority, cancellable,
1154                       async_ready_close_callback_wrapper, user_data);
1155 }
1156
1157 /**
1158  * g_input_stream_close_finish:
1159  * @stream: a #GInputStream.
1160  * @result: a #GAsyncResult.
1161  * @error: a #GError location to store the error occurring, or %NULL to 
1162  * ignore.
1163  * 
1164  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
1165  * 
1166  * Returns: %TRUE if the stream was closed successfully.
1167  **/
1168 gboolean
1169 g_input_stream_close_finish (GInputStream  *stream,
1170                              GAsyncResult  *result,
1171                              GError       **error)
1172 {
1173   GInputStreamClass *class;
1174
1175   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1176   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1177
1178   if (g_async_result_legacy_propagate_error (result, error))
1179     return FALSE;
1180   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
1181     return g_task_propagate_boolean (G_TASK (result), error);
1182
1183   class = G_INPUT_STREAM_GET_CLASS (stream);
1184   return class->close_finish (stream, result, error);
1185 }
1186
1187 /**
1188  * g_input_stream_is_closed:
1189  * @stream: input stream.
1190  * 
1191  * Checks if an input stream is closed.
1192  * 
1193  * Returns: %TRUE if the stream is closed.
1194  **/
1195 gboolean
1196 g_input_stream_is_closed (GInputStream *stream)
1197 {
1198   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1199   
1200   return stream->priv->closed;
1201 }
1202  
1203 /**
1204  * g_input_stream_has_pending:
1205  * @stream: input stream.
1206  * 
1207  * Checks if an input stream has pending actions.
1208  * 
1209  * Returns: %TRUE if @stream has pending actions.
1210  **/  
1211 gboolean
1212 g_input_stream_has_pending (GInputStream *stream)
1213 {
1214   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1215   
1216   return stream->priv->pending;
1217 }
1218
1219 /**
1220  * g_input_stream_set_pending:
1221  * @stream: input stream
1222  * @error: a #GError location to store the error occurring, or %NULL to 
1223  * ignore.
1224  * 
1225  * Sets @stream to have actions pending. If the pending flag is
1226  * already set or @stream is closed, it will return %FALSE and set
1227  * @error.
1228  *
1229  * Returns: %TRUE if pending was previously unset and is now set.
1230  **/
1231 gboolean
1232 g_input_stream_set_pending (GInputStream *stream, GError **error)
1233 {
1234   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1235   
1236   if (stream->priv->closed)
1237     {
1238       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1239                            _("Stream is already closed"));
1240       return FALSE;
1241     }
1242   
1243   if (stream->priv->pending)
1244     {
1245       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1246                 /* Translators: This is an error you get if there is already an
1247                  * operation running against this stream when you try to start
1248                  * one */
1249                  _("Stream has outstanding operation"));
1250       return FALSE;
1251     }
1252   
1253   stream->priv->pending = TRUE;
1254   return TRUE;
1255 }
1256
1257 /**
1258  * g_input_stream_clear_pending:
1259  * @stream: input stream
1260  * 
1261  * Clears the pending flag on @stream.
1262  **/
1263 void
1264 g_input_stream_clear_pending (GInputStream *stream)
1265 {
1266   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1267   
1268   stream->priv->pending = FALSE;
1269 }
1270
1271 /*< internal >
1272  * g_input_stream_async_read_is_via_threads:
1273  * @stream: input stream
1274  *
1275  * Checks if an input stream's read_async function uses threads.
1276  *
1277  * Returns: %TRUE if @stream's read_async function uses threads.
1278  **/
1279 gboolean
1280 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1281 {
1282   GInputStreamClass *class;
1283
1284   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1285
1286   class = G_INPUT_STREAM_GET_CLASS (stream);
1287
1288   return (class->read_async == g_input_stream_real_read_async &&
1289       !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1290         g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1291 }
1292
1293 /*< internal >
1294  * g_input_stream_async_close_is_via_threads:
1295  * @stream: input stream
1296  *
1297  * Checks if an input stream's close_async function uses threads.
1298  *
1299  * Returns: %TRUE if @stream's close_async function uses threads.
1300  **/
1301 gboolean
1302 g_input_stream_async_close_is_via_threads (GInputStream *stream)
1303 {
1304   GInputStreamClass *class;
1305
1306   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1307
1308   class = G_INPUT_STREAM_GET_CLASS (stream);
1309
1310   return class->close_async == g_input_stream_real_close_async;
1311 }
1312
1313 /********************************************
1314  *   Default implementation of async ops    *
1315  ********************************************/
1316
1317 typedef struct {
1318   void   *buffer;
1319   gsize   count;
1320 } ReadData;
1321
1322 static void
1323 free_read_data (ReadData *op)
1324 {
1325   g_slice_free (ReadData, op);
1326 }
1327
1328 static void
1329 read_async_thread (GTask        *task,
1330                    gpointer      source_object,
1331                    gpointer      task_data,
1332                    GCancellable *cancellable)
1333 {
1334   GInputStream *stream = source_object;
1335   ReadData *op = task_data;
1336   GInputStreamClass *class;
1337   GError *error = NULL;
1338   gssize nread;
1339  
1340   class = G_INPUT_STREAM_GET_CLASS (stream);
1341
1342   nread = class->read_fn (stream,
1343                           op->buffer, op->count,
1344                           g_task_get_cancellable (task),
1345                           &error);
1346   if (nread == -1)
1347     g_task_return_error (task, error);
1348   else
1349     g_task_return_int (task, nread);
1350 }
1351
1352 static void read_async_pollable (GPollableInputStream *stream,
1353                                  GTask                *task);
1354
1355 static gboolean
1356 read_async_pollable_ready (GPollableInputStream *stream,
1357                            gpointer              user_data)
1358 {
1359   GTask *task = user_data;
1360
1361   read_async_pollable (stream, task);
1362   return FALSE;
1363 }
1364
1365 static void
1366 read_async_pollable (GPollableInputStream *stream,
1367                      GTask                *task)
1368 {
1369   ReadData *op = g_task_get_task_data (task);
1370   GError *error = NULL;
1371   gssize nread;
1372
1373   if (g_task_return_error_if_cancelled (task))
1374     return;
1375
1376   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1377     read_nonblocking (stream, op->buffer, op->count, &error);
1378
1379   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1380     {
1381       GSource *source;
1382
1383       g_error_free (error);
1384
1385       source = g_pollable_input_stream_create_source (stream,
1386                                                       g_task_get_cancellable (task));
1387       g_task_attach_source (task, source,
1388                             (GSourceFunc) read_async_pollable_ready);
1389       g_source_unref (source);
1390       return;
1391     }
1392
1393   if (nread == -1)
1394     g_task_return_error (task, error);
1395   else
1396     g_task_return_int (task, nread);
1397   /* g_input_stream_real_read_async() unrefs task */
1398 }
1399
1400
1401 static void
1402 g_input_stream_real_read_async (GInputStream        *stream,
1403                                 void                *buffer,
1404                                 gsize                count,
1405                                 int                  io_priority,
1406                                 GCancellable        *cancellable,
1407                                 GAsyncReadyCallback  callback,
1408                                 gpointer             user_data)
1409 {
1410   GTask *task;
1411   ReadData *op;
1412   
1413   op = g_slice_new0 (ReadData);
1414   task = g_task_new (stream, cancellable, callback, user_data);
1415   g_task_set_source_tag (task, g_input_stream_real_read_async);
1416   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1417   g_task_set_priority (task, io_priority);
1418   op->buffer = buffer;
1419   op->count = count;
1420
1421   if (!g_input_stream_async_read_is_via_threads (stream))
1422     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1423   else
1424     g_task_run_in_thread (task, read_async_thread);
1425   g_object_unref (task);
1426 }
1427
1428 static gssize
1429 g_input_stream_real_read_finish (GInputStream  *stream,
1430                                  GAsyncResult  *result,
1431                                  GError       **error)
1432 {
1433   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1434
1435   return g_task_propagate_int (G_TASK (result), error);
1436 }
1437
1438
1439 static void
1440 skip_async_thread (GTask        *task,
1441                    gpointer      source_object,
1442                    gpointer      task_data,
1443                    GCancellable *cancellable)
1444 {
1445   GInputStream *stream = source_object;
1446   gsize count = GPOINTER_TO_SIZE (task_data);
1447   GInputStreamClass *class;
1448   GError *error = NULL;
1449   gssize ret;
1450
1451   class = G_INPUT_STREAM_GET_CLASS (stream);
1452   ret = class->skip (stream, count,
1453                      g_task_get_cancellable (task),
1454                      &error);
1455   if (ret == -1)
1456     g_task_return_error (task, error);
1457   else
1458     g_task_return_int (task, ret);
1459 }
1460
1461 typedef struct {
1462   char buffer[8192];
1463   gsize count;
1464   gsize count_skipped;
1465 } SkipFallbackAsyncData;
1466
1467 static void
1468 skip_callback_wrapper (GObject      *source_object,
1469                        GAsyncResult *res,
1470                        gpointer      user_data)
1471 {
1472   GInputStreamClass *class;
1473   GTask *task = user_data;
1474   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1475   GError *error = NULL;
1476   gssize ret;
1477
1478   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1479
1480   if (ret > 0)
1481     {
1482       data->count -= ret;
1483       data->count_skipped += ret;
1484
1485       if (data->count > 0)
1486         {
1487           class = G_INPUT_STREAM_GET_CLASS (source_object);
1488           class->read_async (G_INPUT_STREAM (source_object),
1489                              data->buffer, MIN (8192, data->count),
1490                              g_task_get_priority (task),
1491                              g_task_get_cancellable (task),
1492                              skip_callback_wrapper, task);
1493           return;
1494         }
1495     }
1496
1497   if (ret == -1 &&
1498       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1499       data->count_skipped)
1500     {
1501       /* No error, return partial read */
1502       g_clear_error (&error);
1503     }
1504
1505   if (error)
1506     g_task_return_error (task, error);
1507   else
1508     g_task_return_int (task, data->count_skipped);
1509   g_object_unref (task);
1510  }
1511
1512 static void
1513 g_input_stream_real_skip_async (GInputStream        *stream,
1514                                 gsize                count,
1515                                 int                  io_priority,
1516                                 GCancellable        *cancellable,
1517                                 GAsyncReadyCallback  callback,
1518                                 gpointer             user_data)
1519 {
1520   GInputStreamClass *class;
1521   SkipFallbackAsyncData *data;
1522   GTask *task;
1523
1524   class = G_INPUT_STREAM_GET_CLASS (stream);
1525
1526   task = g_task_new (stream, cancellable, callback, user_data);
1527   g_task_set_source_tag (task, g_input_stream_real_skip_async);
1528   g_task_set_priority (task, io_priority);
1529
1530   if (g_input_stream_async_read_is_via_threads (stream))
1531     {
1532       /* Read is thread-using async fallback.
1533        * Make skip use threads too, so that we can use a possible sync skip
1534        * implementation. */
1535       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1536
1537       g_task_run_in_thread (task, skip_async_thread);
1538       g_object_unref (task);
1539     }
1540   else
1541     {
1542       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1543       
1544       /* There is a custom async read function, lets use that. */
1545       data = g_new (SkipFallbackAsyncData, 1);
1546       data->count = count;
1547       data->count_skipped = 0;
1548       g_task_set_task_data (task, data, g_free);
1549       g_task_set_check_cancellable (task, FALSE);
1550       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1551                          skip_callback_wrapper, task);
1552     }
1553
1554 }
1555
1556 static gssize
1557 g_input_stream_real_skip_finish (GInputStream  *stream,
1558                                  GAsyncResult  *result,
1559                                  GError       **error)
1560 {
1561   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1562
1563   return g_task_propagate_int (G_TASK (result), error);
1564 }
1565
1566 static void
1567 close_async_thread (GTask        *task,
1568                     gpointer      source_object,
1569                     gpointer      task_data,
1570                     GCancellable *cancellable)
1571 {
1572   GInputStream *stream = source_object;
1573   GInputStreamClass *class;
1574   GError *error = NULL;
1575   gboolean result;
1576
1577   class = G_INPUT_STREAM_GET_CLASS (stream);
1578   if (class->close_fn)
1579     {
1580       result = class->close_fn (stream,
1581                                 g_task_get_cancellable (task),
1582                                 &error);
1583       if (!result)
1584         {
1585           g_task_return_error (task, error);
1586           return;
1587         }
1588     }
1589
1590   g_task_return_boolean (task, TRUE);
1591 }
1592
1593 static void
1594 g_input_stream_real_close_async (GInputStream        *stream,
1595                                  int                  io_priority,
1596                                  GCancellable        *cancellable,
1597                                  GAsyncReadyCallback  callback,
1598                                  gpointer             user_data)
1599 {
1600   GTask *task;
1601
1602   task = g_task_new (stream, cancellable, callback, user_data);
1603   g_task_set_source_tag (task, g_input_stream_real_close_async);
1604   g_task_set_check_cancellable (task, FALSE);
1605   g_task_set_priority (task, io_priority);
1606   
1607   g_task_run_in_thread (task, close_async_thread);
1608   g_object_unref (task);
1609 }
1610
1611 static gboolean
1612 g_input_stream_real_close_finish (GInputStream  *stream,
1613                                   GAsyncResult  *result,
1614                                   GError       **error)
1615 {
1616   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1617
1618   return g_task_propagate_boolean (G_TASK (result), error);
1619 }