spec: Remove kdbus-extension packages for _with_da_profile
[platform/upstream/glib.git] / gio / ginputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * SPDX-License-Identifier: LGPL-2.1-or-later
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General
18  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include <glib.h>
25 #include "glibintl.h"
26
27 #include "ginputstream.h"
28 #include "gioprivate.h"
29 #include "gseekable.h"
30 #include "gcancellable.h"
31 #include "gasyncresult.h"
32 #include "gioerror.h"
33 #include "gpollableinputstream.h"
34
35 /**
36  * GInputStream:
37  *
38  * `GInputStream` is a base class for implementing streaming input.
39  *
40  * It has functions to read from a stream ([method@Gio.InputStream.read]),
41  * to close a stream ([method@Gio.InputStream.close]) and to skip some content
42  * ([method@Gio.InputStream.skip]).
43  *
44  * To copy the content of an input stream to an output stream without 
45  * manually handling the reads and writes, use [method@Gio.OutputStream.splice].
46  *
47  * See the documentation for [class@Gio.IOStream] for details of thread safety
48  * of streaming APIs.
49  *
50  * All of these functions have async variants too.
51  **/
52
53 struct _GInputStreamPrivate {
54   guint closed : 1;
55   guint pending : 1;
56   GAsyncReadyCallback outstanding_callback;
57 };
58
59 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
60
61 static gssize   g_input_stream_real_skip         (GInputStream         *stream,
62                                                   gsize                 count,
63                                                   GCancellable         *cancellable,
64                                                   GError              **error);
65 static void     g_input_stream_real_read_async   (GInputStream         *stream,
66                                                   void                 *buffer,
67                                                   gsize                 count,
68                                                   int                   io_priority,
69                                                   GCancellable         *cancellable,
70                                                   GAsyncReadyCallback   callback,
71                                                   gpointer              user_data);
72 static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
73                                                   GAsyncResult         *result,
74                                                   GError              **error);
75 static void     g_input_stream_real_skip_async   (GInputStream         *stream,
76                                                   gsize                 count,
77                                                   int                   io_priority,
78                                                   GCancellable         *cancellable,
79                                                   GAsyncReadyCallback   callback,
80                                                   gpointer              data);
81 static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
82                                                   GAsyncResult         *result,
83                                                   GError              **error);
84 static void     g_input_stream_real_close_async  (GInputStream         *stream,
85                                                   int                   io_priority,
86                                                   GCancellable         *cancellable,
87                                                   GAsyncReadyCallback   callback,
88                                                   gpointer              data);
89 static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
90                                                   GAsyncResult         *result,
91                                                   GError              **error);
92
93 static void
94 g_input_stream_dispose (GObject *object)
95 {
96   GInputStream *stream;
97
98   stream = G_INPUT_STREAM (object);
99   
100   if (!stream->priv->closed)
101     g_input_stream_close (stream, NULL, NULL);
102
103   G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
104 }
105
106
107 static void
108 g_input_stream_class_init (GInputStreamClass *klass)
109 {
110   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
111   
112   gobject_class->dispose = g_input_stream_dispose;
113   
114   klass->skip = g_input_stream_real_skip;
115   klass->read_async = g_input_stream_real_read_async;
116   klass->read_finish = g_input_stream_real_read_finish;
117   klass->skip_async = g_input_stream_real_skip_async;
118   klass->skip_finish = g_input_stream_real_skip_finish;
119   klass->close_async = g_input_stream_real_close_async;
120   klass->close_finish = g_input_stream_real_close_finish;
121 }
122
123 static void
124 g_input_stream_init (GInputStream *stream)
125 {
126   stream->priv = g_input_stream_get_instance_private (stream);
127 }
128
129 /**
130  * g_input_stream_read:
131  * @stream: a #GInputStream.
132  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
133  *     a buffer to read data into (which should be at least count bytes long).
134  * @count: (in): the number of bytes that will be read from the stream
135  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
136  * @error: location to store the error occurring, or %NULL to ignore
137  *
138  * Tries to read @count bytes from the stream into the buffer starting at
139  * @buffer. Will block during this read.
140  * 
141  * If count is zero returns zero and does nothing. A value of @count
142  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
143  *
144  * On success, the number of bytes read into the buffer is returned.
145  * It is not an error if this is not the same as the requested size, as it
146  * can happen e.g. near the end of a file. Zero is returned on end of file
147  * (or if @count is zero),  but never otherwise.
148  *
149  * The returned @buffer is not a nul-terminated string, it can contain nul bytes
150  * at any position, and this function doesn't nul-terminate the @buffer.
151  *
152  * If @cancellable is not %NULL, then the operation can be cancelled by
153  * triggering the cancellable object from another thread. If the operation
154  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
155  * operation was partially finished when the operation was cancelled the
156  * partial result will be returned, without an error.
157  *
158  * On error -1 is returned and @error is set accordingly.
159  * 
160  * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
161  **/
162 gssize
163 g_input_stream_read  (GInputStream  *stream,
164                       void          *buffer,
165                       gsize          count,
166                       GCancellable  *cancellable,
167                       GError       **error)
168 {
169   GInputStreamClass *class;
170   gssize res;
171
172   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
173   g_return_val_if_fail (buffer != NULL, 0);
174
175   if (count == 0)
176     return 0;
177   
178   if (((gssize) count) < 0)
179     {
180       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
181                    _("Too large count value passed to %s"), G_STRFUNC);
182       return -1;
183     }
184
185   class = G_INPUT_STREAM_GET_CLASS (stream);
186
187   if (class->read_fn == NULL) 
188     {
189       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
190                            _("Input stream doesn’t implement read"));
191       return -1;
192     }
193
194   if (!g_input_stream_set_pending (stream, error))
195     return -1;
196
197   if (cancellable)
198     g_cancellable_push_current (cancellable);
199   
200   res = class->read_fn (stream, buffer, count, cancellable, error);
201
202   if (cancellable)
203     g_cancellable_pop_current (cancellable);
204   
205   g_input_stream_clear_pending (stream);
206
207   return res;
208 }
209
210 /**
211  * g_input_stream_read_all:
212  * @stream: a #GInputStream.
213  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
214  *     a buffer to read data into (which should be at least count bytes long).
215  * @count: (in): the number of bytes that will be read from the stream
216  * @bytes_read: (out): location to store the number of bytes that was read from the stream
217  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
218  * @error: location to store the error occurring, or %NULL to ignore
219  *
220  * Tries to read @count bytes from the stream into the buffer starting at
221  * @buffer. Will block during this read.
222  *
223  * This function is similar to g_input_stream_read(), except it tries to
224  * read as many bytes as requested, only stopping on an error or end of stream.
225  *
226  * On a successful read of @count bytes, or if we reached the end of the
227  * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
228  * read into @buffer.
229  * 
230  * If there is an error during the operation %FALSE is returned and @error
231  * is set to indicate the error status.
232  *
233  * As a special exception to the normal conventions for functions that
234  * use #GError, if this function returns %FALSE (and sets @error) then
235  * @bytes_read will be set to the number of bytes that were successfully
236  * read before the error was encountered.  This functionality is only
237  * available from C.  If you need it from another language then you must
238  * write your own loop around g_input_stream_read().
239  *
240  * Returns: %TRUE on success, %FALSE if there was an error
241  **/
242 gboolean
243 g_input_stream_read_all (GInputStream  *stream,
244                          void          *buffer,
245                          gsize          count,
246                          gsize         *bytes_read,
247                          GCancellable  *cancellable,
248                          GError       **error)
249 {
250   gsize _bytes_read;
251   gssize res;
252
253   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
254   g_return_val_if_fail (buffer != NULL, FALSE);
255
256   _bytes_read = 0;
257   while (_bytes_read < count)
258     {
259       res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
260                                  cancellable, error);
261       if (res == -1)
262         {
263           if (bytes_read)
264             *bytes_read = _bytes_read;
265           return FALSE;
266         }
267       
268       if (res == 0)
269         break;
270
271       _bytes_read += res;
272     }
273
274   if (bytes_read)
275     *bytes_read = _bytes_read;
276   return TRUE;
277 }
278
279 /**
280  * g_input_stream_read_bytes:
281  * @stream: a #GInputStream.
282  * @count: maximum number of bytes that will be read from the stream. Common
283  * values include 4096 and 8192.
284  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
285  * @error: location to store the error occurring, or %NULL to ignore
286  *
287  * Like g_input_stream_read(), this tries to read @count bytes from
288  * the stream in a blocking fashion. However, rather than reading into
289  * a user-supplied buffer, this will create a new #GBytes containing
290  * the data that was read. This may be easier to use from language
291  * bindings.
292  *
293  * If count is zero, returns a zero-length #GBytes and does nothing. A
294  * value of @count larger than %G_MAXSSIZE will cause a
295  * %G_IO_ERROR_INVALID_ARGUMENT error.
296  *
297  * On success, a new #GBytes is returned. It is not an error if the
298  * size of this object is not the same as the requested size, as it
299  * can happen e.g. near the end of a file. A zero-length #GBytes is
300  * returned on end of file (or if @count is zero), but never
301  * otherwise.
302  *
303  * If @cancellable is not %NULL, then the operation can be cancelled by
304  * triggering the cancellable object from another thread. If the operation
305  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
306  * operation was partially finished when the operation was cancelled the
307  * partial result will be returned, without an error.
308  *
309  * On error %NULL is returned and @error is set accordingly.
310  *
311  * Returns: (transfer full): a new #GBytes, or %NULL on error
312  *
313  * Since: 2.34
314  **/
315 GBytes *
316 g_input_stream_read_bytes (GInputStream  *stream,
317                            gsize          count,
318                            GCancellable  *cancellable,
319                            GError       **error)
320 {
321   guchar *buf;
322   gssize nread;
323
324   buf = g_malloc (count);
325   nread = g_input_stream_read (stream, buf, count, cancellable, error);
326   if (nread == -1)
327     {
328       g_free (buf);
329       return NULL;
330     }
331   else if (nread == 0)
332     {
333       g_free (buf);
334       return g_bytes_new_static ("", 0);
335     }
336   else
337     return g_bytes_new_take (buf, nread);
338 }
339
340 /**
341  * g_input_stream_skip:
342  * @stream: a #GInputStream.
343  * @count: the number of bytes that will be skipped from the stream
344  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore. 
345  * @error: location to store the error occurring, or %NULL to ignore
346  *
347  * Tries to skip @count bytes from the stream. Will block during the operation.
348  *
349  * This is identical to g_input_stream_read(), from a behaviour standpoint,
350  * but the bytes that are skipped are not returned to the user. Some
351  * streams have an implementation that is more efficient than reading the data.
352  *
353  * This function is optional for inherited classes, as the default implementation
354  * emulates it using read.
355  *
356  * If @cancellable is not %NULL, then the operation can be cancelled by
357  * triggering the cancellable object from another thread. If the operation
358  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
359  * operation was partially finished when the operation was cancelled the
360  * partial result will be returned, without an error.
361  *
362  * Returns: Number of bytes skipped, or -1 on error
363  **/
364 gssize
365 g_input_stream_skip (GInputStream  *stream,
366                      gsize          count,
367                      GCancellable  *cancellable,
368                      GError       **error)
369 {
370   GInputStreamClass *class;
371   gssize res;
372
373   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
374
375   if (count == 0)
376     return 0;
377
378   if (((gssize) count) < 0)
379     {
380       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
381                    _("Too large count value passed to %s"), G_STRFUNC);
382       return -1;
383     }
384   
385   class = G_INPUT_STREAM_GET_CLASS (stream);
386
387   if (!g_input_stream_set_pending (stream, error))
388     return -1;
389
390   if (cancellable)
391     g_cancellable_push_current (cancellable);
392   
393   res = class->skip (stream, count, cancellable, error);
394
395   if (cancellable)
396     g_cancellable_pop_current (cancellable);
397   
398   g_input_stream_clear_pending (stream);
399
400   return res;
401 }
402
403 static gssize
404 g_input_stream_real_skip (GInputStream  *stream,
405                           gsize          count,
406                           GCancellable  *cancellable,
407                           GError       **error)
408 {
409   GInputStreamClass *class;
410   gssize ret, read_bytes;
411   char buffer[8192];
412   GError *my_error;
413
414   if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
415     {
416       GSeekable *seekable = G_SEEKABLE (stream);
417       goffset start, end;
418       gboolean success;
419
420       /* g_seekable_seek() may try to set pending itself */
421       stream->priv->pending = FALSE;
422
423       start = g_seekable_tell (seekable);
424
425       if (g_seekable_seek (G_SEEKABLE (stream),
426                            0,
427                            G_SEEK_END,
428                            cancellable,
429                            NULL))
430         {
431           end = g_seekable_tell (seekable);
432           g_assert (start >= 0);
433           g_assert (end >= start);
434           if (start > (goffset) (G_MAXOFFSET - count) ||
435               (goffset) (start + count) > end)
436             {
437               stream->priv->pending = TRUE;
438               return end - start;
439             }
440
441           success = g_seekable_seek (G_SEEKABLE (stream),
442                                      start + count,
443                                      G_SEEK_SET,
444                                      cancellable,
445                                      error);
446           stream->priv->pending = TRUE;
447
448           if (success)
449             return count;
450           else
451             return -1;
452         }
453     }
454
455   /* If not seekable, or seek failed, fall back to reading data: */
456
457   class = G_INPUT_STREAM_GET_CLASS (stream);
458
459   read_bytes = 0;
460   while (1)
461     {
462       my_error = NULL;
463
464       ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
465                             cancellable, &my_error);
466       if (ret == -1)
467         {
468           if (read_bytes > 0 &&
469               my_error->domain == G_IO_ERROR &&
470               my_error->code == G_IO_ERROR_CANCELLED)
471             {
472               g_error_free (my_error);
473               return read_bytes;
474             }
475
476           g_propagate_error (error, my_error);
477           return -1;
478         }
479
480       count -= ret;
481       read_bytes += ret;
482
483       if (ret == 0 || count == 0)
484         return read_bytes;
485     }
486 }
487
488 /**
489  * g_input_stream_close:
490  * @stream: A #GInputStream.
491  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
492  * @error: location to store the error occurring, or %NULL to ignore
493  *
494  * Closes the stream, releasing resources related to it.
495  *
496  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
497  * Closing a stream multiple times will not return an error.
498  *
499  * Streams will be automatically closed when the last reference
500  * is dropped, but you might want to call this function to make sure 
501  * resources are released as early as possible.
502  *
503  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
504  * open after the stream is closed. See the documentation for the individual
505  * stream for details.
506  *
507  * On failure the first error that happened will be reported, but the close
508  * operation will finish as much as possible. A stream that failed to
509  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
510  * is important to check and report the error to the user.
511  *
512  * If @cancellable is not %NULL, then the operation can be cancelled by
513  * triggering the cancellable object from another thread. If the operation
514  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
515  * Cancelling a close will still leave the stream closed, but some streams
516  * can use a faster close that doesn't block to e.g. check errors. 
517  *
518  * Returns: %TRUE on success, %FALSE on failure
519  **/
520 gboolean
521 g_input_stream_close (GInputStream  *stream,
522                       GCancellable  *cancellable,
523                       GError       **error)
524 {
525   GInputStreamClass *class;
526   gboolean res;
527
528   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
529
530   class = G_INPUT_STREAM_GET_CLASS (stream);
531
532   if (stream->priv->closed)
533     return TRUE;
534
535   res = TRUE;
536
537   if (!g_input_stream_set_pending (stream, error))
538     return FALSE;
539
540   if (cancellable)
541     g_cancellable_push_current (cancellable);
542
543   if (class->close_fn)
544     res = class->close_fn (stream, cancellable, error);
545
546   if (cancellable)
547     g_cancellable_pop_current (cancellable);
548
549   g_input_stream_clear_pending (stream);
550   
551   stream->priv->closed = TRUE;
552   
553   return res;
554 }
555
556 static void
557 async_ready_callback_wrapper (GObject      *source_object,
558                               GAsyncResult *res,
559                               gpointer      user_data)
560 {
561   GInputStream *stream = G_INPUT_STREAM (source_object);
562
563   g_input_stream_clear_pending (stream);
564   if (stream->priv->outstanding_callback)
565     (*stream->priv->outstanding_callback) (source_object, res, user_data);
566   g_object_unref (stream);
567 }
568
569 static void
570 async_ready_close_callback_wrapper (GObject      *source_object,
571                                     GAsyncResult *res,
572                                     gpointer      user_data)
573 {
574   GInputStream *stream = G_INPUT_STREAM (source_object);
575
576   g_input_stream_clear_pending (stream);
577   stream->priv->closed = TRUE;
578   if (stream->priv->outstanding_callback)
579     (*stream->priv->outstanding_callback) (source_object, res, user_data);
580   g_object_unref (stream);
581 }
582
583 /**
584  * g_input_stream_read_async:
585  * @stream: A #GInputStream.
586  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
587  *     a buffer to read data into (which should be at least count bytes long).
588  * @count: (in): the number of bytes that will be read from the stream
589  * @io_priority: the [I/O priority][io-priority]
590  * of the request. 
591  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
592  * @callback: (scope async): a #GAsyncReadyCallback
593  *   to call when the request is satisfied
594  * @user_data: the data to pass to callback function
595  *
596  * Request an asynchronous read of @count bytes from the stream into the buffer
597  * starting at @buffer. When the operation is finished @callback will be called. 
598  * You can then call g_input_stream_read_finish() to get the result of the 
599  * operation.
600  *
601  * During an async request no other sync and async calls are allowed on @stream, and will
602  * result in %G_IO_ERROR_PENDING errors. 
603  *
604  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
605  *
606  * On success, the number of bytes read into the buffer will be passed to the
607  * callback. It is not an error if this is not the same as the requested size, as it
608  * can happen e.g. near the end of a file, but generally we try to read
609  * as many bytes as requested. Zero is returned on end of file
610  * (or if @count is zero),  but never otherwise.
611  *
612  * Any outstanding i/o request with higher priority (lower numerical value) will
613  * be executed before an outstanding request with lower priority. Default
614  * priority is %G_PRIORITY_DEFAULT.
615  *
616  * The asynchronous methods have a default fallback that uses threads to implement
617  * asynchronicity, so they are optional for inheriting classes. However, if you
618  * override one you must override all.
619  **/
620 void
621 g_input_stream_read_async (GInputStream        *stream,
622                            void                *buffer,
623                            gsize                count,
624                            int                  io_priority,
625                            GCancellable        *cancellable,
626                            GAsyncReadyCallback  callback,
627                            gpointer             user_data)
628 {
629   GInputStreamClass *class;
630   GError *error = NULL;
631
632   g_return_if_fail (G_IS_INPUT_STREAM (stream));
633   g_return_if_fail (buffer != NULL);
634
635   if (count == 0)
636     {
637       GTask *task;
638
639       task = g_task_new (stream, cancellable, callback, user_data);
640       g_task_set_source_tag (task, g_input_stream_read_async);
641       g_task_return_int (task, 0);
642       g_object_unref (task);
643       return;
644     }
645   
646   if (((gssize) count) < 0)
647     {
648       g_task_report_new_error (stream, callback, user_data,
649                                g_input_stream_read_async,
650                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
651                                _("Too large count value passed to %s"),
652                                G_STRFUNC);
653       return;
654     }
655
656   if (!g_input_stream_set_pending (stream, &error))
657     {
658       g_task_report_error (stream, callback, user_data,
659                            g_input_stream_read_async,
660                            error);
661       return;
662     }
663
664   class = G_INPUT_STREAM_GET_CLASS (stream);
665   stream->priv->outstanding_callback = callback;
666   g_object_ref (stream);
667   class->read_async (stream, buffer, count, io_priority, cancellable,
668                      async_ready_callback_wrapper, user_data);
669 }
670
671 /**
672  * g_input_stream_read_finish:
673  * @stream: a #GInputStream.
674  * @result: a #GAsyncResult.
675  * @error: a #GError location to store the error occurring, or %NULL to 
676  * ignore.
677  * 
678  * Finishes an asynchronous stream read operation. 
679  * 
680  * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
681  **/
682 gssize
683 g_input_stream_read_finish (GInputStream  *stream,
684                             GAsyncResult  *result,
685                             GError       **error)
686 {
687   GInputStreamClass *class;
688   
689   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
690   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
691
692   if (g_async_result_legacy_propagate_error (result, error))
693     return -1;
694   else if (g_async_result_is_tagged (result, g_input_stream_read_async))
695     return g_task_propagate_int (G_TASK (result), error);
696
697   class = G_INPUT_STREAM_GET_CLASS (stream);
698   return class->read_finish (stream, result, error);
699 }
700
701 typedef struct
702 {
703   gchar *buffer;
704   gsize to_read;
705   gsize bytes_read;
706 } AsyncReadAll;
707
708 static void
709 free_async_read_all (gpointer data)
710 {
711   g_slice_free (AsyncReadAll, data);
712 }
713
714 static void
715 read_all_callback (GObject      *stream,
716                    GAsyncResult *result,
717                    gpointer      user_data)
718 {
719   GTask *task = user_data;
720   AsyncReadAll *data = g_task_get_task_data (task);
721   gboolean got_eof = FALSE;
722
723   if (result)
724     {
725       GError *error = NULL;
726       gssize nread;
727
728       nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
729
730       if (nread == -1)
731         {
732           g_task_return_error (task, error);
733           g_object_unref (task);
734           return;
735         }
736
737       g_assert_cmpint (nread, <=, data->to_read);
738       data->to_read -= nread;
739       data->bytes_read += nread;
740       got_eof = (nread == 0);
741     }
742
743   if (got_eof || data->to_read == 0)
744     {
745       g_task_return_boolean (task, TRUE);
746       g_object_unref (task);
747     }
748
749   else
750     g_input_stream_read_async (G_INPUT_STREAM (stream),
751                                data->buffer + data->bytes_read,
752                                data->to_read,
753                                g_task_get_priority (task),
754                                g_task_get_cancellable (task),
755                                read_all_callback, task);
756 }
757
758
759 static void
760 read_all_async_thread (GTask        *task,
761                        gpointer      source_object,
762                        gpointer      task_data,
763                        GCancellable *cancellable)
764 {
765   GInputStream *stream = source_object;
766   AsyncReadAll *data = task_data;
767   GError *error = NULL;
768
769   if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
770                                g_task_get_cancellable (task), &error))
771     g_task_return_boolean (task, TRUE);
772   else
773     g_task_return_error (task, error);
774 }
775
776 /**
777  * g_input_stream_read_all_async:
778  * @stream: A #GInputStream
779  * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
780  *     a buffer to read data into (which should be at least count bytes long)
781  * @count: (in): the number of bytes that will be read from the stream
782  * @io_priority: the [I/O priority][io-priority] of the request
783  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
784  * @callback: (scope async): a #GAsyncReadyCallback
785  *   to call when the request is satisfied
786  * @user_data: the data to pass to callback function
787  *
788  * Request an asynchronous read of @count bytes from the stream into the
789  * buffer starting at @buffer.
790  *
791  * This is the asynchronous equivalent of g_input_stream_read_all().
792  *
793  * Call g_input_stream_read_all_finish() to collect the result.
794  *
795  * Any outstanding I/O request with higher priority (lower numerical
796  * value) will be executed before an outstanding request with lower
797  * priority. Default priority is %G_PRIORITY_DEFAULT.
798  *
799  * Since: 2.44
800  **/
801 void
802 g_input_stream_read_all_async (GInputStream        *stream,
803                                void                *buffer,
804                                gsize                count,
805                                int                  io_priority,
806                                GCancellable        *cancellable,
807                                GAsyncReadyCallback  callback,
808                                gpointer             user_data)
809 {
810   AsyncReadAll *data;
811   GTask *task;
812
813   g_return_if_fail (G_IS_INPUT_STREAM (stream));
814   g_return_if_fail (buffer != NULL || count == 0);
815
816   task = g_task_new (stream, cancellable, callback, user_data);
817   data = g_slice_new0 (AsyncReadAll);
818   data->buffer = buffer;
819   data->to_read = count;
820
821   g_task_set_source_tag (task, g_input_stream_read_all_async);
822   g_task_set_task_data (task, data, free_async_read_all);
823   g_task_set_priority (task, io_priority);
824
825   /* If async reads are going to be handled via the threadpool anyway
826    * then we may as well do it with a single dispatch instead of
827    * bouncing in and out.
828    */
829   if (g_input_stream_async_read_is_via_threads (stream))
830     {
831       g_task_run_in_thread (task, read_all_async_thread);
832       g_object_unref (task);
833     }
834   else
835     read_all_callback (G_OBJECT (stream), NULL, task);
836 }
837
838 /**
839  * g_input_stream_read_all_finish:
840  * @stream: a #GInputStream
841  * @result: a #GAsyncResult
842  * @bytes_read: (out): location to store the number of bytes that was read from the stream
843  * @error: a #GError location to store the error occurring, or %NULL to ignore
844  *
845  * Finishes an asynchronous stream read operation started with
846  * g_input_stream_read_all_async().
847  *
848  * As a special exception to the normal conventions for functions that
849  * use #GError, if this function returns %FALSE (and sets @error) then
850  * @bytes_read will be set to the number of bytes that were successfully
851  * read before the error was encountered.  This functionality is only
852  * available from C.  If you need it from another language then you must
853  * write your own loop around g_input_stream_read_async().
854  *
855  * Returns: %TRUE on success, %FALSE if there was an error
856  *
857  * Since: 2.44
858  **/
859 gboolean
860 g_input_stream_read_all_finish (GInputStream  *stream,
861                                 GAsyncResult  *result,
862                                 gsize         *bytes_read,
863                                 GError       **error)
864 {
865   GTask *task;
866
867   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
868   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
869
870   task = G_TASK (result);
871
872   if (bytes_read)
873     {
874       AsyncReadAll *data = g_task_get_task_data (task);
875
876       *bytes_read = data->bytes_read;
877     }
878
879   return g_task_propagate_boolean (task, error);
880 }
881
882 static void
883 read_bytes_callback (GObject      *stream,
884                      GAsyncResult *result,
885                      gpointer      user_data)
886 {
887   GTask *task = user_data;
888   guchar *buf = g_task_get_task_data (task);
889   GError *error = NULL;
890   gssize nread;
891   GBytes *bytes = NULL;
892
893   nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
894                                       result, &error);
895   if (nread == -1)
896     {
897       g_free (buf);
898       g_task_return_error (task, error);
899     }
900   else if (nread == 0)
901     {
902       g_free (buf);
903       bytes = g_bytes_new_static ("", 0);
904     }
905   else
906     bytes = g_bytes_new_take (buf, nread);
907
908   if (bytes)
909     g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
910
911   g_object_unref (task);
912 }
913
914 /**
915  * g_input_stream_read_bytes_async:
916  * @stream: A #GInputStream.
917  * @count: the number of bytes that will be read from the stream
918  * @io_priority: the [I/O priority][io-priority] of the request
919  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
920  * @callback: (scope async): a #GAsyncReadyCallback
921  *   to call when the request is satisfied
922  * @user_data: the data to pass to callback function
923  *
924  * Request an asynchronous read of @count bytes from the stream into a
925  * new #GBytes. When the operation is finished @callback will be
926  * called. You can then call g_input_stream_read_bytes_finish() to get the
927  * result of the operation.
928  *
929  * During an async request no other sync and async calls are allowed
930  * on @stream, and will result in %G_IO_ERROR_PENDING errors.
931  *
932  * A value of @count larger than %G_MAXSSIZE will cause a
933  * %G_IO_ERROR_INVALID_ARGUMENT error.
934  *
935  * On success, the new #GBytes will be passed to the callback. It is
936  * not an error if this is smaller than the requested size, as it can
937  * happen e.g. near the end of a file, but generally we try to read as
938  * many bytes as requested. Zero is returned on end of file (or if
939  * @count is zero), but never otherwise.
940  *
941  * Any outstanding I/O request with higher priority (lower numerical
942  * value) will be executed before an outstanding request with lower
943  * priority. Default priority is %G_PRIORITY_DEFAULT.
944  *
945  * Since: 2.34
946  **/
947 void
948 g_input_stream_read_bytes_async (GInputStream          *stream,
949                                  gsize                  count,
950                                  int                    io_priority,
951                                  GCancellable          *cancellable,
952                                  GAsyncReadyCallback    callback,
953                                  gpointer               user_data)
954 {
955   GTask *task;
956   guchar *buf;
957
958   task = g_task_new (stream, cancellable, callback, user_data);
959   g_task_set_source_tag (task, g_input_stream_read_bytes_async);
960
961   buf = g_malloc (count);
962   g_task_set_task_data (task, buf, NULL);
963
964   g_input_stream_read_async (stream, buf, count,
965                              io_priority, cancellable,
966                              read_bytes_callback, task);
967 }
968
969 /**
970  * g_input_stream_read_bytes_finish:
971  * @stream: a #GInputStream.
972  * @result: a #GAsyncResult.
973  * @error: a #GError location to store the error occurring, or %NULL to
974  *   ignore.
975  *
976  * Finishes an asynchronous stream read-into-#GBytes operation.
977  *
978  * Returns: (transfer full): the newly-allocated #GBytes, or %NULL on error
979  *
980  * Since: 2.34
981  **/
982 GBytes *
983 g_input_stream_read_bytes_finish (GInputStream  *stream,
984                                   GAsyncResult  *result,
985                                   GError       **error)
986 {
987   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
988   g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
989
990   return g_task_propagate_pointer (G_TASK (result), error);
991 }
992
993 /**
994  * g_input_stream_skip_async:
995  * @stream: A #GInputStream.
996  * @count: the number of bytes that will be skipped from the stream
997  * @io_priority: the [I/O priority][io-priority] of the request
998  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
999  * @callback: (scope async): a #GAsyncReadyCallback
1000  *   to call when the request is satisfied
1001  * @user_data: the data to pass to callback function
1002  *
1003  * Request an asynchronous skip of @count bytes from the stream.
1004  * When the operation is finished @callback will be called.
1005  * You can then call g_input_stream_skip_finish() to get the result
1006  * of the operation.
1007  *
1008  * During an async request no other sync and async calls are allowed,
1009  * and will result in %G_IO_ERROR_PENDING errors.
1010  *
1011  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
1012  *
1013  * On success, the number of bytes skipped will be passed to the callback.
1014  * It is not an error if this is not the same as the requested size, as it
1015  * can happen e.g. near the end of a file, but generally we try to skip
1016  * as many bytes as requested. Zero is returned on end of file
1017  * (or if @count is zero), but never otherwise.
1018  *
1019  * Any outstanding i/o request with higher priority (lower numerical value)
1020  * will be executed before an outstanding request with lower priority.
1021  * Default priority is %G_PRIORITY_DEFAULT.
1022  *
1023  * The asynchronous methods have a default fallback that uses threads to
1024  * implement asynchronicity, so they are optional for inheriting classes.
1025  * However, if you override one, you must override all.
1026  **/
1027 void
1028 g_input_stream_skip_async (GInputStream        *stream,
1029                            gsize                count,
1030                            int                  io_priority,
1031                            GCancellable        *cancellable,
1032                            GAsyncReadyCallback  callback,
1033                            gpointer             user_data)
1034 {
1035   GInputStreamClass *class;
1036   GError *error = NULL;
1037
1038   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1039
1040   if (count == 0)
1041     {
1042       GTask *task;
1043
1044       task = g_task_new (stream, cancellable, callback, user_data);
1045       g_task_set_source_tag (task, g_input_stream_skip_async);
1046       g_task_return_int (task, 0);
1047       g_object_unref (task);
1048       return;
1049     }
1050   
1051   if (((gssize) count) < 0)
1052     {
1053       g_task_report_new_error (stream, callback, user_data,
1054                                g_input_stream_skip_async,
1055                                G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1056                                _("Too large count value passed to %s"),
1057                                G_STRFUNC);
1058       return;
1059     }
1060
1061   if (!g_input_stream_set_pending (stream, &error))
1062     {
1063       g_task_report_error (stream, callback, user_data,
1064                            g_input_stream_skip_async,
1065                            error);
1066       return;
1067     }
1068
1069   class = G_INPUT_STREAM_GET_CLASS (stream);
1070   stream->priv->outstanding_callback = callback;
1071   g_object_ref (stream);
1072   class->skip_async (stream, count, io_priority, cancellable,
1073                      async_ready_callback_wrapper, user_data);
1074 }
1075
1076 /**
1077  * g_input_stream_skip_finish:
1078  * @stream: a #GInputStream.
1079  * @result: a #GAsyncResult.
1080  * @error: a #GError location to store the error occurring, or %NULL to 
1081  * ignore.
1082  * 
1083  * Finishes a stream skip operation.
1084  * 
1085  * Returns: the size of the bytes skipped, or `-1` on error.
1086  **/
1087 gssize
1088 g_input_stream_skip_finish (GInputStream  *stream,
1089                             GAsyncResult  *result,
1090                             GError       **error)
1091 {
1092   GInputStreamClass *class;
1093
1094   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
1095   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1096
1097   if (g_async_result_legacy_propagate_error (result, error))
1098     return -1;
1099   else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
1100     return g_task_propagate_int (G_TASK (result), error);
1101
1102   class = G_INPUT_STREAM_GET_CLASS (stream);
1103   return class->skip_finish (stream, result, error);
1104 }
1105
1106 /**
1107  * g_input_stream_close_async:
1108  * @stream: A #GInputStream.
1109  * @io_priority: the [I/O priority][io-priority] of the request
1110  * @cancellable: (nullable): optional cancellable object
1111  * @callback: (scope async): a #GAsyncReadyCallback
1112  *   to call when the request is satisfied
1113  * @user_data: the data to pass to callback function
1114  *
1115  * Requests an asynchronous closes of the stream, releasing resources related to it.
1116  * When the operation is finished @callback will be called. 
1117  * You can then call g_input_stream_close_finish() to get the result of the 
1118  * operation.
1119  *
1120  * For behaviour details see g_input_stream_close().
1121  *
1122  * The asynchronous methods have a default fallback that uses threads to implement
1123  * asynchronicity, so they are optional for inheriting classes. However, if you
1124  * override one you must override all.
1125  **/
1126 void
1127 g_input_stream_close_async (GInputStream        *stream,
1128                             int                  io_priority,
1129                             GCancellable        *cancellable,
1130                             GAsyncReadyCallback  callback,
1131                             gpointer             user_data)
1132 {
1133   GInputStreamClass *class;
1134   GError *error = NULL;
1135
1136   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1137
1138   if (stream->priv->closed)
1139     {
1140       GTask *task;
1141
1142       task = g_task_new (stream, cancellable, callback, user_data);
1143       g_task_set_source_tag (task, g_input_stream_close_async);
1144       g_task_return_boolean (task, TRUE);
1145       g_object_unref (task);
1146       return;
1147     }
1148
1149   if (!g_input_stream_set_pending (stream, &error))
1150     {
1151       g_task_report_error (stream, callback, user_data,
1152                            g_input_stream_close_async,
1153                            error);
1154       return;
1155     }
1156   
1157   class = G_INPUT_STREAM_GET_CLASS (stream);
1158   stream->priv->outstanding_callback = callback;
1159   g_object_ref (stream);
1160   class->close_async (stream, io_priority, cancellable,
1161                       async_ready_close_callback_wrapper, user_data);
1162 }
1163
1164 /**
1165  * g_input_stream_close_finish:
1166  * @stream: a #GInputStream.
1167  * @result: a #GAsyncResult.
1168  * @error: a #GError location to store the error occurring, or %NULL to 
1169  * ignore.
1170  * 
1171  * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
1172  * 
1173  * Returns: %TRUE if the stream was closed successfully.
1174  **/
1175 gboolean
1176 g_input_stream_close_finish (GInputStream  *stream,
1177                              GAsyncResult  *result,
1178                              GError       **error)
1179 {
1180   GInputStreamClass *class;
1181
1182   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1183   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1184
1185   if (g_async_result_legacy_propagate_error (result, error))
1186     return FALSE;
1187   else if (g_async_result_is_tagged (result, g_input_stream_close_async))
1188     return g_task_propagate_boolean (G_TASK (result), error);
1189
1190   class = G_INPUT_STREAM_GET_CLASS (stream);
1191   return class->close_finish (stream, result, error);
1192 }
1193
1194 /**
1195  * g_input_stream_is_closed:
1196  * @stream: input stream.
1197  * 
1198  * Checks if an input stream is closed.
1199  * 
1200  * Returns: %TRUE if the stream is closed.
1201  **/
1202 gboolean
1203 g_input_stream_is_closed (GInputStream *stream)
1204 {
1205   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1206   
1207   return stream->priv->closed;
1208 }
1209  
1210 /**
1211  * g_input_stream_has_pending:
1212  * @stream: input stream.
1213  * 
1214  * Checks if an input stream has pending actions.
1215  * 
1216  * Returns: %TRUE if @stream has pending actions.
1217  **/  
1218 gboolean
1219 g_input_stream_has_pending (GInputStream *stream)
1220 {
1221   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1222   
1223   return stream->priv->pending;
1224 }
1225
1226 /**
1227  * g_input_stream_set_pending:
1228  * @stream: input stream
1229  * @error: a #GError location to store the error occurring, or %NULL to 
1230  * ignore.
1231  * 
1232  * Sets @stream to have actions pending. If the pending flag is
1233  * already set or @stream is closed, it will return %FALSE and set
1234  * @error.
1235  *
1236  * Returns: %TRUE if pending was previously unset and is now set.
1237  **/
1238 gboolean
1239 g_input_stream_set_pending (GInputStream *stream, GError **error)
1240 {
1241   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1242   
1243   if (stream->priv->closed)
1244     {
1245       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1246                            _("Stream is already closed"));
1247       return FALSE;
1248     }
1249   
1250   if (stream->priv->pending)
1251     {
1252       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1253                 /* Translators: This is an error you get if there is already an
1254                  * operation running against this stream when you try to start
1255                  * one */
1256                  _("Stream has outstanding operation"));
1257       return FALSE;
1258     }
1259   
1260   stream->priv->pending = TRUE;
1261   return TRUE;
1262 }
1263
1264 /**
1265  * g_input_stream_clear_pending:
1266  * @stream: input stream
1267  * 
1268  * Clears the pending flag on @stream.
1269  **/
1270 void
1271 g_input_stream_clear_pending (GInputStream *stream)
1272 {
1273   g_return_if_fail (G_IS_INPUT_STREAM (stream));
1274   
1275   stream->priv->pending = FALSE;
1276 }
1277
1278 /*< internal >
1279  * g_input_stream_async_read_is_via_threads:
1280  * @stream: input stream
1281  *
1282  * Checks if an input stream's read_async function uses threads.
1283  *
1284  * Returns: %TRUE if @stream's read_async function uses threads.
1285  **/
1286 gboolean
1287 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1288 {
1289   GInputStreamClass *class;
1290
1291   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1292
1293   class = G_INPUT_STREAM_GET_CLASS (stream);
1294
1295   return (class->read_async == g_input_stream_real_read_async &&
1296       !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1297         g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1298 }
1299
1300 /*< internal >
1301  * g_input_stream_async_close_is_via_threads:
1302  * @stream: input stream
1303  *
1304  * Checks if an input stream's close_async function uses threads.
1305  *
1306  * Returns: %TRUE if @stream's close_async function uses threads.
1307  **/
1308 gboolean
1309 g_input_stream_async_close_is_via_threads (GInputStream *stream)
1310 {
1311   GInputStreamClass *class;
1312
1313   g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1314
1315   class = G_INPUT_STREAM_GET_CLASS (stream);
1316
1317   return class->close_async == g_input_stream_real_close_async;
1318 }
1319
1320 /********************************************
1321  *   Default implementation of async ops    *
1322  ********************************************/
1323
1324 typedef struct {
1325   void   *buffer;
1326   gsize   count;
1327 } ReadData;
1328
1329 static void
1330 free_read_data (ReadData *op)
1331 {
1332   g_slice_free (ReadData, op);
1333 }
1334
1335 static void
1336 read_async_thread (GTask        *task,
1337                    gpointer      source_object,
1338                    gpointer      task_data,
1339                    GCancellable *cancellable)
1340 {
1341   GInputStream *stream = source_object;
1342   ReadData *op = task_data;
1343   GInputStreamClass *class;
1344   GError *error = NULL;
1345   gssize nread;
1346  
1347   class = G_INPUT_STREAM_GET_CLASS (stream);
1348
1349   nread = class->read_fn (stream,
1350                           op->buffer, op->count,
1351                           g_task_get_cancellable (task),
1352                           &error);
1353   if (nread == -1)
1354     g_task_return_error (task, error);
1355   else
1356     g_task_return_int (task, nread);
1357 }
1358
1359 static void read_async_pollable (GPollableInputStream *stream,
1360                                  GTask                *task);
1361
1362 static gboolean
1363 read_async_pollable_ready (GPollableInputStream *stream,
1364                            gpointer              user_data)
1365 {
1366   GTask *task = user_data;
1367
1368   read_async_pollable (stream, task);
1369   return FALSE;
1370 }
1371
1372 static void
1373 read_async_pollable (GPollableInputStream *stream,
1374                      GTask                *task)
1375 {
1376   ReadData *op = g_task_get_task_data (task);
1377   GError *error = NULL;
1378   gssize nread;
1379
1380   if (g_task_return_error_if_cancelled (task))
1381     return;
1382
1383   nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1384     read_nonblocking (stream, op->buffer, op->count, &error);
1385
1386   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1387     {
1388       GSource *source;
1389
1390       g_error_free (error);
1391
1392       source = g_pollable_input_stream_create_source (stream,
1393                                                       g_task_get_cancellable (task));
1394       g_task_attach_source (task, source,
1395                             (GSourceFunc) read_async_pollable_ready);
1396       g_source_unref (source);
1397       return;
1398     }
1399
1400   if (nread == -1)
1401     g_task_return_error (task, error);
1402   else
1403     g_task_return_int (task, nread);
1404   /* g_input_stream_real_read_async() unrefs task */
1405 }
1406
1407
1408 static void
1409 g_input_stream_real_read_async (GInputStream        *stream,
1410                                 void                *buffer,
1411                                 gsize                count,
1412                                 int                  io_priority,
1413                                 GCancellable        *cancellable,
1414                                 GAsyncReadyCallback  callback,
1415                                 gpointer             user_data)
1416 {
1417   GTask *task;
1418   ReadData *op;
1419   
1420   op = g_slice_new0 (ReadData);
1421   task = g_task_new (stream, cancellable, callback, user_data);
1422   g_task_set_source_tag (task, g_input_stream_real_read_async);
1423   g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1424   g_task_set_priority (task, io_priority);
1425   op->buffer = buffer;
1426   op->count = count;
1427
1428   if (!g_input_stream_async_read_is_via_threads (stream))
1429     read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1430   else
1431     g_task_run_in_thread (task, read_async_thread);
1432   g_object_unref (task);
1433 }
1434
1435 static gssize
1436 g_input_stream_real_read_finish (GInputStream  *stream,
1437                                  GAsyncResult  *result,
1438                                  GError       **error)
1439 {
1440   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1441
1442   return g_task_propagate_int (G_TASK (result), error);
1443 }
1444
1445
1446 static void
1447 skip_async_thread (GTask        *task,
1448                    gpointer      source_object,
1449                    gpointer      task_data,
1450                    GCancellable *cancellable)
1451 {
1452   GInputStream *stream = source_object;
1453   gsize count = GPOINTER_TO_SIZE (task_data);
1454   GInputStreamClass *class;
1455   GError *error = NULL;
1456   gssize ret;
1457
1458   class = G_INPUT_STREAM_GET_CLASS (stream);
1459   ret = class->skip (stream, count,
1460                      g_task_get_cancellable (task),
1461                      &error);
1462   if (ret == -1)
1463     g_task_return_error (task, error);
1464   else
1465     g_task_return_int (task, ret);
1466 }
1467
1468 typedef struct {
1469   char buffer[8192];
1470   gsize count;
1471   gsize count_skipped;
1472 } SkipFallbackAsyncData;
1473
1474 static void
1475 skip_callback_wrapper (GObject      *source_object,
1476                        GAsyncResult *res,
1477                        gpointer      user_data)
1478 {
1479   GInputStreamClass *class;
1480   GTask *task = user_data;
1481   SkipFallbackAsyncData *data = g_task_get_task_data (task);
1482   GError *error = NULL;
1483   gssize ret;
1484
1485   ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1486
1487   if (ret > 0)
1488     {
1489       data->count -= ret;
1490       data->count_skipped += ret;
1491
1492       if (data->count > 0)
1493         {
1494           class = G_INPUT_STREAM_GET_CLASS (source_object);
1495           class->read_async (G_INPUT_STREAM (source_object),
1496                              data->buffer, MIN (8192, data->count),
1497                              g_task_get_priority (task),
1498                              g_task_get_cancellable (task),
1499                              skip_callback_wrapper, task);
1500           return;
1501         }
1502     }
1503
1504   if (ret == -1 &&
1505       g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1506       data->count_skipped)
1507     {
1508       /* No error, return partial read */
1509       g_clear_error (&error);
1510     }
1511
1512   if (error)
1513     g_task_return_error (task, error);
1514   else
1515     g_task_return_int (task, data->count_skipped);
1516   g_object_unref (task);
1517  }
1518
1519 static void
1520 g_input_stream_real_skip_async (GInputStream        *stream,
1521                                 gsize                count,
1522                                 int                  io_priority,
1523                                 GCancellable        *cancellable,
1524                                 GAsyncReadyCallback  callback,
1525                                 gpointer             user_data)
1526 {
1527   GInputStreamClass *class;
1528   SkipFallbackAsyncData *data;
1529   GTask *task;
1530
1531   class = G_INPUT_STREAM_GET_CLASS (stream);
1532
1533   task = g_task_new (stream, cancellable, callback, user_data);
1534   g_task_set_source_tag (task, g_input_stream_real_skip_async);
1535   g_task_set_priority (task, io_priority);
1536
1537   if (g_input_stream_async_read_is_via_threads (stream))
1538     {
1539       /* Read is thread-using async fallback.
1540        * Make skip use threads too, so that we can use a possible sync skip
1541        * implementation. */
1542       g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1543
1544       g_task_run_in_thread (task, skip_async_thread);
1545       g_object_unref (task);
1546     }
1547   else
1548     {
1549       /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1550       
1551       /* There is a custom async read function, lets use that. */
1552       data = g_new (SkipFallbackAsyncData, 1);
1553       data->count = count;
1554       data->count_skipped = 0;
1555       g_task_set_task_data (task, data, g_free);
1556       g_task_set_check_cancellable (task, FALSE);
1557       class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1558                          skip_callback_wrapper, task);
1559     }
1560
1561 }
1562
1563 static gssize
1564 g_input_stream_real_skip_finish (GInputStream  *stream,
1565                                  GAsyncResult  *result,
1566                                  GError       **error)
1567 {
1568   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1569
1570   return g_task_propagate_int (G_TASK (result), error);
1571 }
1572
1573 static void
1574 close_async_thread (GTask        *task,
1575                     gpointer      source_object,
1576                     gpointer      task_data,
1577                     GCancellable *cancellable)
1578 {
1579   GInputStream *stream = source_object;
1580   GInputStreamClass *class;
1581   GError *error = NULL;
1582   gboolean result;
1583
1584   class = G_INPUT_STREAM_GET_CLASS (stream);
1585   if (class->close_fn)
1586     {
1587       result = class->close_fn (stream,
1588                                 g_task_get_cancellable (task),
1589                                 &error);
1590       if (!result)
1591         {
1592           g_task_return_error (task, error);
1593           return;
1594         }
1595     }
1596
1597   g_task_return_boolean (task, TRUE);
1598 }
1599
1600 static void
1601 g_input_stream_real_close_async (GInputStream        *stream,
1602                                  int                  io_priority,
1603                                  GCancellable        *cancellable,
1604                                  GAsyncReadyCallback  callback,
1605                                  gpointer             user_data)
1606 {
1607   GTask *task;
1608
1609   task = g_task_new (stream, cancellable, callback, user_data);
1610   g_task_set_source_tag (task, g_input_stream_real_close_async);
1611   g_task_set_check_cancellable (task, FALSE);
1612   g_task_set_priority (task, io_priority);
1613   
1614   g_task_run_in_thread (task, close_async_thread);
1615   g_object_unref (task);
1616 }
1617
1618 static gboolean
1619 g_input_stream_real_close_finish (GInputStream  *stream,
1620                                   GAsyncResult  *result,
1621                                   GError       **error)
1622 {
1623   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1624
1625   return g_task_propagate_boolean (G_TASK (result), error);
1626 }