GVariant: support serialising to GVariantVectors
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Alexander Larsson <alexl@redhat.com>
19  */
20
21 #include "config.h"
22 #include <glib.h>
23 #include "glibintl.h"
24
25 #include "ginputstream.h"
26 #include "gioprivate.h"
27 #include "gseekable.h"
28 #include "gcancellable.h"
29 #include "gasyncresult.h"
30 #include "gioerror.h"
31 #include "gpollableinputstream.h"
32
33 /**
34  * SECTION:ginputstream
35  * @short_description: Base class for implementing streaming input
36  * @include: gio/gio.h
37  *
38  * #GInputStream has functions to read from a stream (g_input_stream_read()),
39  * to close a stream (g_input_stream_close()) and to skip some content
40  * (g_input_stream_skip()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 struct _GInputStreamPrivate {
49   guint closed : 1;
50   guint pending : 1;
51   GAsyncReadyCallback outstanding_callback;
52 };
53
54 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
55
56 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
57                                                   gsize                 count,
58                                                   GCancellable         *cancellable,
59                                                   GError              **error);
60 static void     g_input_stream_real_read_async   (GInputStream         *stream,
61                                                   void                 *buffer,
62                                                   gsize                 count,
63                                                   int                   io_priority,
64                                                   GCancellable         *cancellable,
65                                                   GAsyncReadyCallback   callback,
66                                                   gpointer              user_data);
67 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
68                                                   GAsyncResult         *result,
69                                                   GError              **error);
70 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
71                                                   gsize                 count,
72                                                   int                   io_priority,
73                                                   GCancellable         *cancellable,
74                                                   GAsyncReadyCallback   callback,
75                                                   gpointer              data);
76 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
77                                                   GAsyncResult         *result,
78                                                   GError              **error);
79 static void     g_input_stream_real_close_async  (GInputStream         *stream,
80                                                   int                   io_priority,
81                                                   GCancellable         *cancellable,
82                                                   GAsyncReadyCallback   callback,
83                                                   gpointer              data);
84 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
85                                                   GAsyncResult         *result,
86                                                   GError              **error);
87
88 static void
89 g_input_stream_dispose (GObject *object)
90 {
91   GInputStream *stream;
92
93   stream = G_INPUT_STREAM (object);
94   
95   if (!stream->priv->closed)
96     g_input_stream_close (stream, NULL, NULL);
97
98   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
99 }
100
101
102 static void
103 g_input_stream_class_init (GInputStreamClass *klass)
104 {
105   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
106   
107   gobject_class->dispose = g_input_stream_dispose;
108   
109   klass->skip = g_input_stream_real_skip;
110   klass->read_async = g_input_stream_real_read_async;
111   klass->read_finish = g_input_stream_real_read_finish;
112   klass->skip_async = g_input_stream_real_skip_async;
113   klass->skip_finish = g_input_stream_real_skip_finish;
114   klass->close_async = g_input_stream_real_close_async;
115   klass->close_finish = g_input_stream_real_close_finish;
116 }
117
118 static void
119 g_input_stream_init (GInputStream *stream)
120 {
121   stream->priv = g_input_stream_get_instance_private (stream);
122 }
123
124 /**
125  * g_input_stream_read:
126  * @stream: a #GInputStream.
127  * @buffer: (array length=count) (element-type guint8): a buffer to
128  *     read data into (which should be at least count bytes long).
129  * @count: the number of bytes that will be read from the stream
130  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
131  * @error: location to store the error occurring, or %NULL to ignore
132  *
133  * Tries to read @count bytes from the stream into the buffer starting at
134  * @buffer. Will block during this read.
135  * 
136  * If count is zero returns zero and does nothing. A value of @count
137  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
138  *
139  * On success, the number of bytes read into the buffer is returned.
140  * It is not an error if this is not the same as the requested size, as it
141  * can happen e.g. near the end of a file. Zero is returned on end of file
142  * (or if @count is zero),  but never otherwise.
143  *
144  * The returned @buffer is not a nul-terminated string, it can contain nul bytes
145  * at any position, and this function doesn't nul-terminate the @buffer.
146  *
147  * If @cancellable is not %NULL, then the operation can be cancelled by
148  * triggering the cancellable object from another thread. If the operation
149  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
150  * operation was partially finished when the operation was cancelled the
151  * partial result will be returned, without an error.
152  *
153  * On error -1 is returned and @error is set accordingly.
154  * 
155  * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
156  **/
157 gssize
158 g_input_stream_read  (GInputStream  *stream,
159                       void          *buffer,
160                       gsize          count,
161                       GCancellable  *cancellable,
162                       GError       **error)
163 {
164   GInputStreamClass *class;
165   gssize res;
166
167   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
168   g_return_val_if_fail (buffer != NULL, 0);
169
170   if (count == 0)
171     return 0;
172   
173   if (((gssize) count) < 0)
174     {
175       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
176                    _("Too large count value passed to %s"), G_STRFUNC);
177       return -1;
178     }
179
180   class = G_INPUT_STREAM_GET_CLASS (stream);
181
182   if (class->read_fn == NULL) 
183     {
184       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
185                            _("Input stream doesn't implement read"));
186       return -1;
187     }
188
189   if (!g_input_stream_set_pending (stream, error))
190     return -1;
191
192   if (cancellable)
193     g_cancellable_push_current (cancellable);
194   
195   res = class->read_fn (stream, buffer, count, cancellable, error);
196
197   if (cancellable)
198     g_cancellable_pop_current (cancellable);
199   
200   g_input_stream_clear_pending (stream);
201
202   return res;
203 }
204
205 /**
206  * g_input_stream_read_all:
207  * @stream: a #GInputStream.
208  * @buffer: (array length=count) (element-type guint8): a buffer to
209  *     read data into (which should be at least count bytes long).
210  * @count: the number of bytes that will be read from the stream
211  * @bytes_read: (out): location to store the number of bytes that was read from the stream
212  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
213  * @error: location to store the error occurring, or %NULL to ignore
214  *
215  * Tries to read @count bytes from the stream into the buffer starting at
216  * @buffer. Will block during this read.
217  *
218  * This function is similar to g_input_stream_read(), except it tries to
219  * read as many bytes as requested, only stopping on an error or end of stream.
220  *
221  * On a successful read of @count bytes, or if we reached the end of the
222  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
223  * read into @buffer.
224  * 
225  * If there is an error during the operation %FALSE is returned and @error
226  * is set to indicate the error status.
227  *
228  * As a special exception to the normal conventions for functions that
229  * use #GError, if this function returns %FALSE (and sets @error) then
230  * @bytes_read will be set to the number of bytes that were successfully
231  * read before the error was encountered.  This functionality is only
232  * available from C.  If you need it from another language then you must
233  * write your own loop around g_input_stream_read().
234  *
235  * Returns: %TRUE on success, %FALSE if there was an error
236  **/
237 gboolean
238 g_input_stream_read_all (GInputStream  *stream,
239                          void          *buffer,
240                          gsize          count,
241                          gsize         *bytes_read,
242                          GCancellable  *cancellable,
243                          GError       **error)
244 {
245   gsize _bytes_read;
246   gssize res;
247
248   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
249   g_return_val_if_fail (buffer != NULL, FALSE);
250
251   _bytes_read = 0;
252   while (_bytes_read < count)
253     {
254       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
255                                  cancellable, error);
256       if (res == -1)
257         {
258           if (bytes_read)
259             *bytes_read = _bytes_read;
260           return FALSE;
261         }
262       
263       if (res == 0)
264         break;
265
266       _bytes_read += res;
267     }
268
269   if (bytes_read)
270     *bytes_read = _bytes_read;
271   return TRUE;
272 }
273
274 /**
275  * g_input_stream_read_bytes:
276  * @stream: a #GInputStream.
277  * @count: maximum number of bytes that will be read from the stream. Common
278  * values include 4096 and 8192.
279  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
280  * @error: location to store the error occurring, or %NULL to ignore
281  *
282  * Like g_input_stream_read(), this tries to read @count bytes from
283  * the stream in a blocking fashion. However, rather than reading into
284  * a user-supplied buffer, this will create a new #GBytes containing
285  * the data that was read. This may be easier to use from language
286  * bindings.
287  *
288  * If count is zero, returns a zero-length #GBytes and does nothing. A
289  * value of @count larger than %G_MAXSSIZE will cause a
290  * %G_IO_ERROR_INVALID_ARGUMENT error.
291  *
292  * On success, a new #GBytes is returned. It is not an error if the
293  * size of this object is not the same as the requested size, as it
294  * can happen e.g. near the end of a file. A zero-length #GBytes is
295  * returned on end of file (or if @count is zero), but never
296  * otherwise.
297  *
298  * If @cancellable is not %NULL, then the operation can be cancelled by
299  * triggering the cancellable object from another thread. If the operation
300  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
301  * operation was partially finished when the operation was cancelled the
302  * partial result will be returned, without an error.
303  *
304  * On error %NULL is returned and @error is set accordingly.
305  *
306  * Returns: a new #GBytes, or %NULL on error
307  *
308  * Since: 2.34
309  **/
310 GBytes *
311 g_input_stream_read_bytes (GInputStream  *stream,
312                            gsize          count,
313                            GCancellable  *cancellable,
314                            GError       **error)
315 {
316   guchar *buf;
317   gssize nread;
318
319   buf = g_malloc (count);
320   nread = g_input_stream_read (stream, buf, count, cancellable, error);
321   if (nread == -1)
322     {
323       g_free (buf);
324       return NULL;
325     }
326   else if (nread == 0)
327     {
328       g_free (buf);
329       return g_bytes_new_static ("", 0);
330     }
331   else
332     return g_bytes_new_take (buf, nread);
333 }
334
335 /**
336  * g_input_stream_skip:
337  * @stream: a #GInputStream.
338  * @count: the number of bytes that will be skipped from the stream
339  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
340  * @error: location to store the error occurring, or %NULL to ignore
341  *
342  * Tries to skip @count bytes from the stream. Will block during the operation.
343  *
344  * This is identical to g_input_stream_read(), from a behaviour standpoint,
345  * but the bytes that are skipped are not returned to the user. Some
346  * streams have an implementation that is more efficient than reading the data.
347  *
348  * This function is optional for inherited classes, as the default implementation
349  * emulates it using read.
350  *
351  * If @cancellable is not %NULL, then the operation can be cancelled by
352  * triggering the cancellable object from another thread. If the operation
353  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
354  * operation was partially finished when the operation was cancelled the
355  * partial result will be returned, without an error.
356  *
357  * Returns: Number of bytes skipped, or -1 on error
358  **/
359 gssize
360 g_input_stream_skip (GInputStream  *stream,
361                      gsize          count,
362                      GCancellable  *cancellable,
363                      GError       **error)
364 {
365   GInputStreamClass *class;
366   gssize res;
367
368   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
369
370   if (count == 0)
371     return 0;
372
373   if (((gssize) count) < 0)
374     {
375       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
376                    _("Too large count value passed to %s"), G_STRFUNC);
377       return -1;
378     }
379   
380   class = G_INPUT_STREAM_GET_CLASS (stream);
381
382   if (!g_input_stream_set_pending (stream, error))
383     return -1;
384
385   if (cancellable)
386     g_cancellable_push_current (cancellable);
387   
388   res = class->skip (stream, count, cancellable, error);
389
390   if (cancellable)
391     g_cancellable_pop_current (cancellable);
392   
393   g_input_stream_clear_pending (stream);
394
395   return res;
396 }
397
398 static gssize
399 g_input_stream_real_skip (GInputStream  *stream,
400                           gsize          count,
401                           GCancellable  *cancellable,
402                           GError       **error)
403 {
404   GInputStreamClass *class;
405   gssize ret, read_bytes;
406   char buffer[8192];
407   GError *my_error;
408
409   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
410     {
411       if (g_seekable_seek (G_SEEKABLE (stream),
412                            count,
413                            G_SEEK_CUR,
414                            cancellable,
415                            NULL))
416         return count;
417     }
418
419   /* If not seekable, or seek failed, fall back to reading data: */
420
421   class = G_INPUT_STREAM_GET_CLASS (stream);
422
423   read_bytes = 0;
424   while (1)
425     {
426       my_error = NULL;
427
428       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
429                             cancellable, &my_error);
430       if (ret == -1)
431         {
432           if (read_bytes > 0 &&
433               my_error->domain == G_IO_ERROR &&
434               my_error->code == G_IO_ERROR_CANCELLED)
435             {
436               g_error_free (my_error);
437               return read_bytes;
438             }
439
440           g_propagate_error (error, my_error);
441           return -1;
442         }
443
444       count -= ret;
445       read_bytes += ret;
446
447       if (ret == 0 || count == 0)
448         return read_bytes;
449     }
450 }
451
452 /**
453  * g_input_stream_close:
454  * @stream: A #GInputStream.
455  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
456  * @error: location to store the error occurring, or %NULL to ignore
457  *
458  * Closes the stream, releasing resources related to it.
459  *
460  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
461  * Closing a stream multiple times will not return an error.
462  *
463  * Streams will be automatically closed when the last reference
464  * is dropped, but you might want to call this function to make sure 
465  * resources are released as early as possible.
466  *
467  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
468  * open after the stream is closed. See the documentation for the individual
469  * stream for details.
470  *
471  * On failure the first error that happened will be reported, but the close
472  * operation will finish as much as possible. A stream that failed to
473  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
474  * is important to check and report the error to the user.
475  *
476  * If @cancellable is not %NULL, then the operation can be cancelled by
477  * triggering the cancellable object from another thread. If the operation
478  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
479  * Cancelling a close will still leave the stream closed, but some streams
480  * can use a faster close that doesn't block to e.g. check errors. 
481  *
482  * Returns: %TRUE on success, %FALSE on failure
483  **/
484 gboolean
485 g_input_stream_close (GInputStream  *stream,
486                       GCancellable  *cancellable,
487                       GError       **error)
488 {
489   GInputStreamClass *class;
490   gboolean res;
491
492   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
493
494   class = G_INPUT_STREAM_GET_CLASS (stream);
495
496   if (stream->priv->closed)
497     return TRUE;
498
499   res = TRUE;
500
501   if (!g_input_stream_set_pending (stream, error))
502     return FALSE;
503
504   if (cancellable)
505     g_cancellable_push_current (cancellable);
506
507   if (class->close_fn)
508     res = class->close_fn (stream, cancellable, error);
509
510   if (cancellable)
511     g_cancellable_pop_current (cancellable);
512
513   g_input_stream_clear_pending (stream);
514   
515   stream->priv->closed = TRUE;
516   
517   return res;
518 }
519
520 static void
521 async_ready_callback_wrapper (GObject      *source_object,
522                               GAsyncResult *res,
523                               gpointer      user_data)
524 {
525   GInputStream *stream = G_INPUT_STREAM (source_object);
526
527   g_input_stream_clear_pending (stream);
528   if (stream->priv->outstanding_callback)
529     (*stream->priv->outstanding_callback) (source_object, res, user_data);
530   g_object_unref (stream);
531 }
532
533 static void
534 async_ready_close_callback_wrapper (GObject      *source_object,
535                                     GAsyncResult *res,
536                                     gpointer      user_data)
537 {
538   GInputStream *stream = G_INPUT_STREAM (source_object);
539
540   g_input_stream_clear_pending (stream);
541   stream->priv->closed = TRUE;
542   if (stream->priv->outstanding_callback)
543     (*stream->priv->outstanding_callback) (source_object, res, user_data);
544   g_object_unref (stream);
545 }
546
547 /**
548  * g_input_stream_read_async:
549  * @stream: A #GInputStream.
550  * @buffer: (array length=count) (element-type guint8): a buffer to
551  *     read data into (which should be at least count bytes long).
552  * @count: the number of bytes that will be read from the stream
553  * @io_priority: the [I/O priority][io-priority]
554  * of the request. 
555  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
556  * @callback: (scope async): callback to call when the request is satisfied
557  * @user_data: (closure): the data to pass to callback function
558  *
559  * Request an asynchronous read of @count bytes from the stream into the buffer
560  * starting at @buffer. When the operation is finished @callback will be called. 
561  * You can then call g_input_stream_read_finish() to get the result of the 
562  * operation.
563  *
564  * During an async request no other sync and async calls are allowed on @stream, and will
565  * result in %G_IO_ERROR_PENDING errors. 
566  *
567  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
568  *
569  * On success, the number of bytes read into the buffer will be passed to the
570  * callback. It is not an error if this is not the same as the requested size, as it
571  * can happen e.g. near the end of a file, but generally we try to read
572  * as many bytes as requested. Zero is returned on end of file
573  * (or if @count is zero),  but never otherwise.
574  *
575  * Any outstanding i/o request with higher priority (lower numerical value) will
576  * be executed before an outstanding request with lower priority. Default
577  * priority is %G_PRIORITY_DEFAULT.
578  *
579  * The asyncronous methods have a default fallback that uses threads to implement
580  * asynchronicity, so they are optional for inheriting classes. However, if you
581  * override one you must override all.
582  **/
583 void
584 g_input_stream_read_async (GInputStream        *stream,
585                            void                *buffer,
586                            gsize                count,
587                            int                  io_priority,
588                            GCancellable        *cancellable,
589                            GAsyncReadyCallback  callback,
590                            gpointer             user_data)
591 {
592   GInputStreamClass *class;
593   GError *error = NULL;
594
595   g_return_if_fail (G_IS_INPUT_STREAM (stream));
596   g_return_if_fail (buffer != NULL);
597
598   if (count == 0)
599     {
600       GTask *task;
601
602       task = g_task_new (stream, cancellable, callback, user_data);
603       g_task_set_source_tag (task, g_input_stream_read_async);
604       g_task_return_int (task, 0);
605       g_object_unref (task);
606       return;
607     }
608   
609   if (((gssize) count) < 0)
610     {
611       g_task_report_new_error (stream, callback, user_data,
612                                g_input_stream_read_async,
613                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
614                                _("Too large count value passed to %s"),
615                                G_STRFUNC);
616       return;
617     }
618
619   if (!g_input_stream_set_pending (stream, &error))
620     {
621       g_task_report_error (stream, callback, user_data,
622                            g_input_stream_read_async,
623                            error);
624       return;
625     }
626
627   class = G_INPUT_STREAM_GET_CLASS (stream);
628   stream->priv->outstanding_callback = callback;
629   g_object_ref (stream);
630   class->read_async (stream, buffer, count, io_priority, cancellable,
631                      async_ready_callback_wrapper, user_data);
632 }
633
634 /**
635  * g_input_stream_read_finish:
636  * @stream: a #GInputStream.
637  * @result: a #GAsyncResult.
638  * @error: a #GError location to store the error occurring, or %NULL to 
639  * ignore.
640  * 
641  * Finishes an asynchronous stream read operation. 
642  * 
643  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
644  **/
645 gssize
646 g_input_stream_read_finish (GInputStream  *stream,
647                             GAsyncResult  *result,
648                             GError       **error)
649 {
650   GInputStreamClass *class;
651   
652   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
653   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
654
655   if (g_async_result_legacy_propagate_error (result, error))
656     return -1;
657   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
658     return g_task_propagate_int (G_TASK (result), error);
659
660   class = G_INPUT_STREAM_GET_CLASS (stream);
661   return class->read_finish (stream, result, error);
662 }
663
664 typedef struct
665 {
666   gchar *buffer;
667   gsize to_read;
668   gsize bytes_read;
669 } AsyncReadAll;
670
671 static void
672 free_async_read_all (gpointer data)
673 {
674   g_slice_free (AsyncReadAll, data);
675 }
676
677 static void
678 read_all_callback (GObject      *stream,
679                    GAsyncResult *result,
680                    gpointer      user_data)
681 {
682   GTask *task = user_data;
683   AsyncReadAll *data = g_task_get_task_data (task);
684   gboolean got_eof = FALSE;
685
686   if (result)
687     {
688       GError *error = NULL;
689       gssize nread;
690
691       nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
692
693       if (nread == -1)
694         {
695           g_task_return_error (task, error);
696           g_object_unref (task);
697           return;
698         }
699
700       g_assert_cmpint (nread, <=, data->to_read);
701       data->to_read -= nread;
702       data->bytes_read += nread;
703       got_eof = (nread == 0);
704     }
705
706   if (got_eof || data->to_read == 0)
707     {
708       g_task_return_boolean (task, TRUE);
709       g_object_unref (task);
710     }
711
712   else
713     g_input_stream_read_async (G_INPUT_STREAM (stream),
714                                data->buffer + data->bytes_read,
715                                data->to_read,
716                                g_task_get_priority (task),
717                                g_task_get_cancellable (task),
718                                read_all_callback, task);
719 }
720
721
722 static void
723 read_all_async_thread (GTask        *task,
724                        gpointer      source_object,
725                        gpointer      task_data,
726                        GCancellable *cancellable)
727 {
728   GInputStream *stream = source_object;
729   AsyncReadAll *data = task_data;
730   GError *error = NULL;
731
732   if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
733                                g_task_get_cancellable (task), &error))
734     g_task_return_boolean (task, TRUE);
735   else
736     g_task_return_error (task, error);
737 }
738
739 /**
740  * g_input_stream_read_all_async:
741  * @stream: A #GInputStream
742  * @buffer: (array length=count) (element-type guint8): a buffer to
743  *     read data into (which should be at least count bytes long)
744  * @count: the number of bytes that will be read from the stream
745  * @io_priority: the [I/O priority][io-priority] of the request
746  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore
747  * @callback: (scope async): callback to call when the request is satisfied
748  * @user_data: (closure): the data to pass to callback function
749  *
750  * Request an asynchronous read of @count bytes from the stream into the
751  * buffer starting at @buffer.
752  *
753  * This is the asynchronous equivalent of g_input_stream_read_all().
754  *
755  * Call g_input_stream_read_all_finish() to collect the result.
756  *
757  * Any outstanding I/O request with higher priority (lower numerical
758  * value) will be executed before an outstanding request with lower
759  * priority. Default priority is %G_PRIORITY_DEFAULT.
760  *
761  * Since: 2.44
762  **/
763 void
764 g_input_stream_read_all_async (GInputStream        *stream,
765                                void                *buffer,
766                                gsize                count,
767                                int                  io_priority,
768                                GCancellable        *cancellable,
769                                GAsyncReadyCallback  callback,
770                                gpointer             user_data)
771 {
772   AsyncReadAll *data;
773   GTask *task;
774
775   g_return_if_fail (G_IS_INPUT_STREAM (stream));
776   g_return_if_fail (buffer != NULL || count == 0);
777
778   task = g_task_new (stream, cancellable, callback, user_data);
779   data = g_slice_new0 (AsyncReadAll);
780   data->buffer = buffer;
781   data->to_read = count;
782
783   g_task_set_task_data (task, data, free_async_read_all);
784   g_task_set_priority (task, io_priority);
785
786   /* If async reads are going to be handled via the threadpool anyway
787    * then we may as well do it with a single dispatch instead of
788    * bouncing in and out.
789    */
790   if (g_input_stream_async_read_is_via_threads (stream))
791     {
792       g_task_run_in_thread (task, read_all_async_thread);
793       g_object_unref (task);
794     }
795   else
796     read_all_callback (G_OBJECT (stream), NULL, task);
797 }
798
799 /**
800  * g_input_stream_read_all_finish:
801  * @stream: a #GInputStream
802  * @result: a #GAsyncResult
803  * @bytes_read: (out): location to store the number of bytes that was read from the stream
804  * @error: a #GError location to store the error occurring, or %NULL to ignore
805  *
806  * Finishes an asynchronous stream read operation started with
807  * g_input_stream_read_all_async().
808  *
809  * As a special exception to the normal conventions for functions that
810  * use #GError, if this function returns %FALSE (and sets @error) then
811  * @bytes_read will be set to the number of bytes that were successfully
812  * read before the error was encountered.  This functionality is only
813  * available from C.  If you need it from another language then you must
814  * write your own loop around g_input_stream_read_async().
815  *
816  * Returns: %TRUE on success, %FALSE if there was an error
817  *
818  * Since: 2.44
819  **/
820 gboolean
821 g_input_stream_read_all_finish (GInputStream  *stream,
822                                 GAsyncResult  *result,
823                                 gsize         *bytes_read,
824                                 GError       **error)
825 {
826   GTask *task;
827
828   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
829   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
830
831   task = G_TASK (result);
832
833   if (bytes_read)
834     {
835       AsyncReadAll *data = g_task_get_task_data (task);
836
837       *bytes_read = data->bytes_read;
838     }
839
840   return g_task_propagate_boolean (task, error);
841 }
842
843 static void
844 read_bytes_callback (GObject      *stream,
845                      GAsyncResult *result,
846                      gpointer      user_data)
847 {
848   GTask *task = user_data;
849   guchar *buf = g_task_get_task_data (task);
850   GError *error = NULL;
851   gssize nread;
852   GBytes *bytes = NULL;
853
854   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
855                                       result, &error);
856   if (nread == -1)
857     {
858       g_free (buf);
859       g_task_return_error (task, error);
860     }
861   else if (nread == 0)
862     {
863       g_free (buf);
864       bytes = g_bytes_new_static ("", 0);
865     }
866   else
867     bytes = g_bytes_new_take (buf, nread);
868
869   if (bytes)
870     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
871
872   g_object_unref (task);
873 }
874
875 /**
876  * g_input_stream_read_bytes_async:
877  * @stream: A #GInputStream.
878  * @count: the number of bytes that will be read from the stream
879  * @io_priority: the [I/O priority][io-priority] of the request
880  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
881  * @callback: (scope async): callback to call when the request is satisfied
882  * @user_data: (closure): the data to pass to callback function
883  *
884  * Request an asynchronous read of @count bytes from the stream into a
885  * new #GBytes. When the operation is finished @callback will be
886  * called. You can then call g_input_stream_read_bytes_finish() to get the
887  * result of the operation.
888  *
889  * During an async request no other sync and async calls are allowed
890  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
891  *
892  * A value of @count larger than %G_MAXSSIZE will cause a
893  * %G_IO_ERROR_INVALID_ARGUMENT error.
894  *
895  * On success, the new #GBytes will be passed to the callback. It is
896  * not an error if this is smaller than the requested size, as it can
897  * happen e.g. near the end of a file, but generally we try to read as
898  * many bytes as requested. Zero is returned on end of file (or if
899  * @count is zero), but never otherwise.
900  *
901  * Any outstanding I/O request with higher priority (lower numerical
902  * value) will be executed before an outstanding request with lower
903  * priority. Default priority is %G_PRIORITY_DEFAULT.
904  *
905  * Since: 2.34
906  **/
907 void
908 g_input_stream_read_bytes_async (GInputStream          *stream,
909                                  gsize                  count,
910                                  int                    io_priority,
911                                  GCancellable          *cancellable,
912                                  GAsyncReadyCallback    callback,
913                                  gpointer               user_data)
914 {
915   GTask *task;
916   guchar *buf;
917
918   task = g_task_new (stream, cancellable, callback, user_data);
919   buf = g_malloc (count);
920   g_task_set_task_data (task, buf, NULL);
921
922   g_input_stream_read_async (stream, buf, count,
923                              io_priority, cancellable,
924                              read_bytes_callback, task);
925 }
926
927 /**
928  * g_input_stream_read_bytes_finish:
929  * @stream: a #GInputStream.
930  * @result: a #GAsyncResult.
931  * @error: a #GError location to store the error occurring, or %NULL to
932  *   ignore.
933  *
934  * Finishes an asynchronous stream read-into-#GBytes operation.
935  *
936  * Returns: the newly-allocated #GBytes, or %NULL on error
937  *
938  * Since: 2.34
939  **/
940 GBytes *
941 g_input_stream_read_bytes_finish (GInputStream  *stream,
942                                   GAsyncResult  *result,
943                                   GError       **error)
944 {
945   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
946   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
947
948   return g_task_propagate_pointer (G_TASK (result), error);
949 }
950
951 /**
952  * g_input_stream_skip_async:
953  * @stream: A #GInputStream.
954  * @count: the number of bytes that will be skipped from the stream
955  * @io_priority: the [I/O priority][io-priority] of the request
956  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
957  * @callback: (scope async): callback to call when the request is satisfied
958  * @user_data: (closure): the data to pass to callback function
959  *
960  * Request an asynchronous skip of @count bytes from the stream.
961  * When the operation is finished @callback will be called.
962  * You can then call g_input_stream_skip_finish() to get the result
963  * of the operation.
964  *
965  * During an async request no other sync and async calls are allowed,
966  * and will result in %G_IO_ERROR_PENDING errors.
967  *
968  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
969  *
970  * On success, the number of bytes skipped will be passed to the callback.
971  * It is not an error if this is not the same as the requested size, as it
972  * can happen e.g. near the end of a file, but generally we try to skip
973  * as many bytes as requested. Zero is returned on end of file
974  * (or if @count is zero), but never otherwise.
975  *
976  * Any outstanding i/o request with higher priority (lower numerical value)
977  * will be executed before an outstanding request with lower priority.
978  * Default priority is %G_PRIORITY_DEFAULT.
979  *
980  * The asynchronous methods have a default fallback that uses threads to
981  * implement asynchronicity, so they are optional for inheriting classes.
982  * However, if you override one, you must override all.
983  **/
984 void
985 g_input_stream_skip_async (GInputStream        *stream,
986                            gsize                count,
987                            int                  io_priority,
988                            GCancellable        *cancellable,
989                            GAsyncReadyCallback  callback,
990                            gpointer             user_data)
991 {
992   GInputStreamClass *class;
993   GError *error = NULL;
994
995   g_return_if_fail (G_IS_INPUT_STREAM (stream));
996
997   if (count == 0)
998     {
999       GTask *task;
1000
1001       task = g_task_new (stream, cancellable, callback, user_data);
1002       g_task_set_source_tag (task, g_input_stream_skip_async);
1003       g_task_return_int (task, 0);
1004       g_object_unref (task);
1005       return;
1006     }
1007   
1008   if (((gssize) count) < 0)
1009     {
1010       g_task_report_new_error (stream, callback, user_data,
1011                                g_input_stream_skip_async,
1012                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1013                                _("Too large count value passed to %s"),
1014                                G_STRFUNC);
1015       return;
1016     }
1017
1018   if (!g_input_stream_set_pending (stream, &error))
1019     {
1020       g_task_report_error (stream, callback, user_data,
1021                            g_input_stream_skip_async,
1022                            error);
1023       return;
1024     }
1025
1026   class = G_INPUT_STREAM_GET_CLASS (stream);
1027   stream->priv->outstanding_callback = callback;
1028   g_object_ref (stream);
1029   class->skip_async (stream, count, io_priority, cancellable,
1030                      async_ready_callback_wrapper, user_data);
1031 }
1032
1033 /**
1034  * g_input_stream_skip_finish:
1035  * @stream: a #GInputStream.
1036  * @result: a #GAsyncResult.
1037  * @error: a #GError location to store the error occurring, or %NULL to 
1038  * ignore.
1039  * 
1040  * Finishes a stream skip operation.
1041  * 
1042  * Returns: the size of the bytes skipped, or %-1 on error.
1043  **/
1044 gssize
1045 g_input_stream_skip_finish (GInputStream  *stream,
1046                             GAsyncResult  *result,
1047                             GError       **error)
1048 {
1049   GInputStreamClass *class;
1050
1051   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
1052   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1053
1054   if (g_async_result_legacy_propagate_error (result, error))
1055     return -1;
1056   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
1057     return g_task_propagate_int (G_TASK (result), error);
1058
1059   class = G_INPUT_STREAM_GET_CLASS (stream);
1060   return class->skip_finish (stream, result, error);
1061 }
1062
1063 /**
1064  * g_input_stream_close_async:
1065  * @stream: A #GInputStream.
1066  * @io_priority: the [I/O priority][io-priority] of the request
1067  * @cancellable: (allow-none): optional cancellable object
1068  * @callback: (scope async): callback to call when the request is satisfied
1069  * @user_data: (closure): the data to pass to callback function
1070  *
1071  * Requests an asynchronous closes of the stream, releasing resources related to it.
1072  * When the operation is finished @callback will be called. 
1073  * You can then call g_input_stream_close_finish() to get the result of the 
1074  * operation.
1075  *
1076  * For behaviour details see g_input_stream_close().
1077  *
1078  * The asyncronous methods have a default fallback that uses threads to implement
1079  * asynchronicity, so they are optional for inheriting classes. However, if you
1080  * override one you must override all.
1081  **/
1082 void
1083 g_input_stream_close_async (GInputStream        *stream,
1084                             int                  io_priority,
1085                             GCancellable        *cancellable,
1086                             GAsyncReadyCallback  callback,
1087                             gpointer             user_data)
1088 {
1089   GInputStreamClass *class;
1090   GError *error = NULL;
1091
1092   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1093
1094   if (stream->priv->closed)
1095     {
1096       GTask *task;
1097
1098       task = g_task_new (stream, cancellable, callback, user_data);
1099       g_task_set_source_tag (task, g_input_stream_close_async);
1100       g_task_return_boolean (task, TRUE);
1101       g_object_unref (task);
1102       return;
1103     }
1104
1105   if (!g_input_stream_set_pending (stream, &error))
1106     {
1107       g_task_report_error (stream, callback, user_data,
1108                            g_input_stream_close_async,
1109                            error);
1110       return;
1111     }
1112   
1113   class = G_INPUT_STREAM_GET_CLASS (stream);
1114   stream->priv->outstanding_callback = callback;
1115   g_object_ref (stream);
1116   class->close_async (stream, io_priority, cancellable,
1117                       async_ready_close_callback_wrapper, user_data);
1118 }
1119
1120 /**
1121  * g_input_stream_close_finish:
1122  * @stream: a #GInputStream.
1123  * @result: a #GAsyncResult.
1124  * @error: a #GError location to store the error occurring, or %NULL to 
1125  * ignore.
1126  * 
1127  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
1128  * 
1129  * Returns: %TRUE if the stream was closed successfully.
1130  **/
1131 gboolean
1132 g_input_stream_close_finish (GInputStream  *stream,
1133                              GAsyncResult  *result,
1134                              GError       **error)
1135 {
1136   GInputStreamClass *class;
1137
1138   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1139   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1140
1141   if (g_async_result_legacy_propagate_error (result, error))
1142     return FALSE;
1143   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
1144     return g_task_propagate_boolean (G_TASK (result), error);
1145
1146   class = G_INPUT_STREAM_GET_CLASS (stream);
1147   return class->close_finish (stream, result, error);
1148 }
1149
1150 /**
1151  * g_input_stream_is_closed:
1152  * @stream: input stream.
1153  * 
1154  * Checks if an input stream is closed.
1155  * 
1156  * Returns: %TRUE if the stream is closed.
1157  **/
1158 gboolean
1159 g_input_stream_is_closed (GInputStream *stream)
1160 {
1161   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1162   
1163   return stream->priv->closed;
1164 }
1165  
1166 /**
1167  * g_input_stream_has_pending:
1168  * @stream: input stream.
1169  * 
1170  * Checks if an input stream has pending actions.
1171  * 
1172  * Returns: %TRUE if @stream has pending actions.
1173  **/  
1174 gboolean
1175 g_input_stream_has_pending (GInputStream *stream)
1176 {
1177   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1178   
1179   return stream->priv->pending;
1180 }
1181
1182 /**
1183  * g_input_stream_set_pending:
1184  * @stream: input stream
1185  * @error: a #GError location to store the error occurring, or %NULL to 
1186  * ignore.
1187  * 
1188  * Sets @stream to have actions pending. If the pending flag is
1189  * already set or @stream is closed, it will return %FALSE and set
1190  * @error.
1191  *
1192  * Returns: %TRUE if pending was previously unset and is now set.
1193  **/
1194 gboolean
1195 g_input_stream_set_pending (GInputStream *stream, GError **error)
1196 {
1197   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1198   
1199   if (stream->priv->closed)
1200     {
1201       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1202                            _("Stream is already closed"));
1203       return FALSE;
1204     }
1205   
1206   if (stream->priv->pending)
1207     {
1208       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1209                 /* Translators: This is an error you get if there is already an
1210                  * operation running against this stream when you try to start
1211                  * one */
1212                  _("Stream has outstanding operation"));
1213       return FALSE;
1214     }
1215   
1216   stream->priv->pending = TRUE;
1217   return TRUE;
1218 }
1219
1220 /**
1221  * g_input_stream_clear_pending:
1222  * @stream: input stream
1223  * 
1224  * Clears the pending flag on @stream.
1225  **/
1226 void
1227 g_input_stream_clear_pending (GInputStream *stream)
1228 {
1229   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1230   
1231   stream->priv->pending = FALSE;
1232 }
1233
1234 /**
1235  * g_input_stream_async_read_is_via_threads:
1236  * @stream: input stream
1237  *
1238  * Checks if an input stream's read_async function uses threads.
1239  *
1240  * Returns: %TRUE if @stream's read_async function uses threads.
1241  **/
1242 gboolean
1243 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1244 {
1245   GInputStreamClass *class;
1246
1247   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1248
1249   class = G_INPUT_STREAM_GET_CLASS (stream);
1250
1251   return (class->read_async == g_input_stream_real_read_async &&
1252       !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1253         g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1254 }
1255
1256 /********************************************
1257  *   Default implementation of async ops    *
1258  ********************************************/
1259
1260 typedef struct {
1261   void   *buffer;
1262   gsize   count;
1263 } ReadData;
1264
1265 static void
1266 free_read_data (ReadData *op)
1267 {
1268   g_slice_free (ReadData, op);
1269 }
1270
1271 static void
1272 read_async_thread (GTask        *task,
1273                    gpointer      source_object,
1274                    gpointer      task_data,
1275                    GCancellable *cancellable)
1276 {
1277   GInputStream *stream = source_object;
1278   ReadData *op = task_data;
1279   GInputStreamClass *class;
1280   GError *error = NULL;
1281   gssize nread;
1282  
1283   class = G_INPUT_STREAM_GET_CLASS (stream);
1284
1285   nread = class->read_fn (stream,
1286                           op->buffer, op->count,
1287                           g_task_get_cancellable (task),
1288                           &error);
1289   if (nread == -1)
1290     g_task_return_error (task, error);
1291   else
1292     g_task_return_int (task, nread);
1293 }
1294
1295 static void read_async_pollable (GPollableInputStream *stream,
1296                                  GTask                *task);
1297
1298 static gboolean
1299 read_async_pollable_ready (GPollableInputStream *stream,
1300                            gpointer              user_data)
1301 {
1302   GTask *task = user_data;
1303
1304   read_async_pollable (stream, task);
1305   return FALSE;
1306 }
1307
1308 static void
1309 read_async_pollable (GPollableInputStream *stream,
1310                      GTask                *task)
1311 {
1312   ReadData *op = g_task_get_task_data (task);
1313   GError *error = NULL;
1314   gssize nread;
1315
1316   if (g_task_return_error_if_cancelled (task))
1317     return;
1318
1319   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1320     read_nonblocking (stream, op->buffer, op->count, &error);
1321
1322   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1323     {
1324       GSource *source;
1325
1326       g_error_free (error);
1327
1328       source = g_pollable_input_stream_create_source (stream,
1329                                                       g_task_get_cancellable (task));
1330       g_task_attach_source (task, source,
1331                             (GSourceFunc) read_async_pollable_ready);
1332       g_source_unref (source);
1333       return;
1334     }
1335
1336   if (nread == -1)
1337     g_task_return_error (task, error);
1338   else
1339     g_task_return_int (task, nread);
1340   /* g_input_stream_real_read_async() unrefs task */
1341 }
1342
1343
1344 static void
1345 g_input_stream_real_read_async (GInputStream        *stream,
1346                                 void                *buffer,
1347                                 gsize                count,
1348                                 int                  io_priority,
1349                                 GCancellable        *cancellable,
1350                                 GAsyncReadyCallback  callback,
1351                                 gpointer             user_data)
1352 {
1353   GTask *task;
1354   ReadData *op;
1355   
1356   op = g_slice_new0 (ReadData);
1357   task = g_task_new (stream, cancellable, callback, user_data);
1358   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1359   g_task_set_priority (task, io_priority);
1360   op->buffer = buffer;
1361   op->count = count;
1362
1363   if (!g_input_stream_async_read_is_via_threads (stream))
1364     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1365   else
1366     g_task_run_in_thread (task, read_async_thread);
1367   g_object_unref (task);
1368 }
1369
1370 static gssize
1371 g_input_stream_real_read_finish (GInputStream  *stream,
1372                                  GAsyncResult  *result,
1373                                  GError       **error)
1374 {
1375   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1376
1377   return g_task_propagate_int (G_TASK (result), error);
1378 }
1379
1380
1381 static void
1382 skip_async_thread (GTask        *task,
1383                    gpointer      source_object,
1384                    gpointer      task_data,
1385                    GCancellable *cancellable)
1386 {
1387   GInputStream *stream = source_object;
1388   gsize count = GPOINTER_TO_SIZE (task_data);
1389   GInputStreamClass *class;
1390   GError *error = NULL;
1391   gssize ret;
1392
1393   class = G_INPUT_STREAM_GET_CLASS (stream);
1394   ret = class->skip (stream, count,
1395                      g_task_get_cancellable (task),
1396                      &error);
1397   if (ret == -1)
1398     g_task_return_error (task, error);
1399   else
1400     g_task_return_int (task, ret);
1401 }
1402
1403 typedef struct {
1404   char buffer[8192];
1405   gsize count;
1406   gsize count_skipped;
1407 } SkipFallbackAsyncData;
1408
1409 static void
1410 skip_callback_wrapper (GObject      *source_object,
1411                        GAsyncResult *res,
1412                        gpointer      user_data)
1413 {
1414   GInputStreamClass *class;
1415   GTask *task = user_data;
1416   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1417   GError *error = NULL;
1418   gssize ret;
1419
1420   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1421
1422   if (ret > 0)
1423     {
1424       data->count -= ret;
1425       data->count_skipped += ret;
1426
1427       if (data->count > 0)
1428         {
1429           class = G_INPUT_STREAM_GET_CLASS (source_object);
1430           class->read_async (G_INPUT_STREAM (source_object),
1431                              data->buffer, MIN (8192, data->count),
1432                              g_task_get_priority (task),
1433                              g_task_get_cancellable (task),
1434                              skip_callback_wrapper, task);
1435           return;
1436         }
1437     }
1438
1439   if (ret == -1 &&
1440       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1441       data->count_skipped)
1442     {
1443       /* No error, return partial read */
1444       g_clear_error (&error);
1445     }
1446
1447   if (error)
1448     g_task_return_error (task, error);
1449   else
1450     g_task_return_int (task, data->count_skipped);
1451   g_object_unref (task);
1452  }
1453
1454 static void
1455 g_input_stream_real_skip_async (GInputStream        *stream,
1456                                 gsize                count,
1457                                 int                  io_priority,
1458                                 GCancellable        *cancellable,
1459                                 GAsyncReadyCallback  callback,
1460                                 gpointer             user_data)
1461 {
1462   GInputStreamClass *class;
1463   SkipFallbackAsyncData *data;
1464   GTask *task;
1465
1466   class = G_INPUT_STREAM_GET_CLASS (stream);
1467
1468   task = g_task_new (stream, cancellable, callback, user_data);
1469   g_task_set_priority (task, io_priority);
1470
1471   if (g_input_stream_async_read_is_via_threads (stream))
1472     {
1473       /* Read is thread-using async fallback.
1474        * Make skip use threads too, so that we can use a possible sync skip
1475        * implementation. */
1476       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1477
1478       g_task_run_in_thread (task, skip_async_thread);
1479       g_object_unref (task);
1480     }
1481   else
1482     {
1483       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1484       
1485       /* There is a custom async read function, lets use that. */
1486       data = g_new (SkipFallbackAsyncData, 1);
1487       data->count = count;
1488       data->count_skipped = 0;
1489       g_task_set_task_data (task, data, g_free);
1490       g_task_set_check_cancellable (task, FALSE);
1491       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1492                          skip_callback_wrapper, task);
1493     }
1494
1495 }
1496
1497 static gssize
1498 g_input_stream_real_skip_finish (GInputStream  *stream,
1499                                  GAsyncResult  *result,
1500                                  GError       **error)
1501 {
1502   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1503
1504   return g_task_propagate_int (G_TASK (result), error);
1505 }
1506
1507 static void
1508 close_async_thread (GTask        *task,
1509                     gpointer      source_object,
1510                     gpointer      task_data,
1511                     GCancellable *cancellable)
1512 {
1513   GInputStream *stream = source_object;
1514   GInputStreamClass *class;
1515   GError *error = NULL;
1516   gboolean result;
1517
1518   class = G_INPUT_STREAM_GET_CLASS (stream);
1519   if (class->close_fn)
1520     {
1521       result = class->close_fn (stream,
1522                                 g_task_get_cancellable (task),
1523                                 &error);
1524       if (!result)
1525         {
1526           g_task_return_error (task, error);
1527           return;
1528         }
1529     }
1530
1531   g_task_return_boolean (task, TRUE);
1532 }
1533
1534 static void
1535 g_input_stream_real_close_async (GInputStream        *stream,
1536                                  int                  io_priority,
1537                                  GCancellable        *cancellable,
1538                                  GAsyncReadyCallback  callback,
1539                                  gpointer             user_data)
1540 {
1541   GTask *task;
1542
1543   task = g_task_new (stream, cancellable, callback, user_data);
1544   g_task_set_check_cancellable (task, FALSE);
1545   g_task_set_priority (task, io_priority);
1546   
1547   g_task_run_in_thread (task, close_async_thread);
1548   g_object_unref (task);
1549 }
1550
1551 static gboolean
1552 g_input_stream_real_close_finish (GInputStream  *stream,
1553                                   GAsyncResult  *result,
1554                                   GError       **error)
1555 {
1556   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1557
1558   return g_task_propagate_boolean (G_TASK (result), error);
1559 }